Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Jul 3, 2024
1 parent 70e3508 commit c587568
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 27 deletions.
59 changes: 46 additions & 13 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5763,8 +5763,8 @@ class ParquetBloomFilterRoundTripTest : public ::testing::Test,
}

template <typename ArrowType>
void VerifyBloomFilter(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
void VerifyBloomFilterContains(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
if (value == std::nullopt) {
continue;
Expand All @@ -5773,6 +5773,17 @@ class ParquetBloomFilterRoundTripTest : public ::testing::Test,
}
}

template <typename ArrowType>
void VerifyBloomFilterNotContains(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
if (value == std::nullopt) {
continue;
}
EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
}
}

protected:
std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
};
Expand All @@ -5781,7 +5792,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
BloomFilterOptions options;
options.ndv = 100;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->enable_bloom_filter_options(options, "c1")
Expand All @@ -5804,16 +5815,26 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
int64_t bloom_filter_idx = 0; // current index in `bloom_filters_`
for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
{
// The bloom filter for same column in another row-group.
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
{
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(),
*col);
VerifyBloomFilterContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
Expand All @@ -5828,7 +5849,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) {
::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))});
bloom_filters_.clear();
BloomFilterOptions options;
options.ndv = 100;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->enable_bloom_filter_options(options, "c1")
Expand All @@ -5843,6 +5864,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) {
[6, "f"]
])"};
auto table = ::arrow::TableFromJSON(schema, contents);
// using non_dict_table to adapt some interface which doesn't support dictionary.
auto non_dict_table = ::arrow::TableFromJSON(origin_schema, contents);
WriteFile(writer_properties, table);

Expand All @@ -5853,18 +5875,28 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) {
int64_t bloom_filter_idx = 0; // current index in `bloom_filters_`
for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
{
// The bloom filter for same column in another row-group.
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = non_dict_table->column(0)->Slice(current_row,
row_group_row_count[row_group_id]);
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
{
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = non_dict_table->column(1)->Slice(current_row,
row_group_row_count[row_group_id]);
VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(),
*col);
VerifyBloomFilterContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
Expand All @@ -5875,7 +5907,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) {
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
BloomFilterOptions options;
options.ndv = 100;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->disable_bloom_filter("c1")
Expand All @@ -5900,7 +5932,8 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) {
{
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
Expand All @@ -5910,7 +5943,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) {
TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) {
auto schema = ::arrow::schema({::arrow::field("boolean_col", ::arrow::boolean())});
BloomFilterOptions options;
options.ndv = 100;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "boolean_col")
->max_row_group_length(4)
Expand Down
29 changes: 17 additions & 12 deletions cpp/src/parquet/bloom_filter_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
const WriterProperties* properties)
: schema_(schema), properties_(properties) {}
BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;

/// Append a new row group to host all incoming bloom filters.
void AppendRowGroup() override;

Expand All @@ -51,14 +54,11 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
/// been flushed.
void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) override;

BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;

private:
/// Make sure column ordinal is not out of bound and the builder is in good state.
void CheckState(int32_t column_ordinal) const {
if (finished_) {
throw ParquetException("BloomFilterBuilder is already finished.");
throw ParquetException("Cannot call WriteTo() twice on BloomFilterBuilder.");
}
if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
throw ParquetException("Invalid column ordinal: ", column_ordinal);
Expand All @@ -85,20 +85,24 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
void BloomFilterBuilderImpl::AppendRowGroup() {
if (finished_) {
throw ParquetException(
"Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
"Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is called");
}
file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
}

BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordinal) {
CheckState(column_ordinal);
const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
// Bloom filter does not support boolean type, and this should be checked in
// CheckState() already.
DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
auto bloom_filter_options_opt = properties_->bloom_filter_options(column_descr->path());
if (bloom_filter_options_opt == std::nullopt) {
return nullptr;
}
BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
// CheckState() should have checked that file_bloom_filters_ is not empty.
DCHECK(!file_bloom_filters_.empty());
RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
auto iter = row_group_bloom_filter.find(column_ordinal);
if (iter == row_group_bloom_filter.end()) {
Expand All @@ -111,7 +115,10 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin
DCHECK(insert_result.second);
iter = insert_result.first;
}
ARROW_CHECK(iter->second != nullptr);
if (iter->second == nullptr) {
throw ParquetException("Bloom filter state is invalid for column ",
column_descr->path());
}
return iter->second.get();
}

Expand All @@ -130,22 +137,20 @@ void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
if (row_group_bloom_filters.empty()) {
continue;
}
bool has_valid_bloom_filter = false;
int num_columns = schema_->num_columns();
std::vector<std::optional<IndexLocation>> locations(num_columns, std::nullopt);

// serialize bloom filter in ascending order of column id
for (auto& [column_id, filter] : row_group_bloom_filters) {
ARROW_CHECK(filter != nullptr);
if (ARROW_PREDICT_FALSE(filter == nullptr)) {
throw ParquetException("Bloom filter state is invalid for column ", column_id);
}
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
filter->WriteTo(sink);
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
has_valid_bloom_filter = true;
locations[column_id] = IndexLocation{offset, static_cast<int32_t>(pos - offset)};
}
if (has_valid_bloom_filter) {
location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations));
}
location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations));
}
}
} // namespace
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,8 @@ template <>
void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, int64_t,
const uint8_t*,
int64_t) {
// BooleanType does not have a bloom filter currently,
// so bloom_filter_ should always be nullptr.
DCHECK(bloom_filter_ == nullptr);
}

Expand Down Expand Up @@ -2504,7 +2506,7 @@ void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
const ::arrow::Array& values) {
if (bloom_filter_) {
// TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to
// parquet file. We can support
// parquet file.
if (!::arrow::is_base_binary_like(values.type_id())) {
throw ParquetException("Only BaseBinaryArray and subclasses supported");
}
Expand All @@ -2513,7 +2515,11 @@ void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
UpdateBinaryBloomFilter(bloom_filter_,
checked_cast<const ::arrow::BinaryArray&>(values));
} else {
DCHECK(::arrow::is_large_binary_like(values.type_id()));
// TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to
// parquet file.
if (!::arrow::is_large_binary_like(values.type_id())) {
throw ParquetException("Only LargeBinaryArray and subclasses supported");
}
UpdateBinaryBloomFilter(bloom_filter_,
checked_cast<const ::arrow::LargeBinaryArray&>(values));
}
Expand Down

0 comments on commit c587568

Please sign in to comment.