Skip to content

Commit

Permalink
fix some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Apr 8, 2024
1 parent f627e30 commit bb8d4a5
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 35 deletions.
3 changes: 2 additions & 1 deletion cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class PARQUET_EXPORT BloomFilter {
///
/// @param value the value to hash.
uint64_t Hash(const ByteArray& value) const { return Hash(&value); }

/// Compute hash for fixed byte array value by using its plain encoding result.
///
/// @param value the value to hash.
Expand All @@ -186,7 +187,7 @@ class PARQUET_EXPORT BloomFilter {
/// Compute hash for std::string_view value by using its plain encoding result.
///
/// @param value the value to hash.
uint64_t Hash(const std::string_view& value) const {
uint64_t Hash(std::string_view value) const {
ByteArray ba(value);
return Hash(&ba);
}
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/parquet/bloom_filter_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@

namespace parquet {

/// Column encryption for bloom filter is not implemented yet.
class BloomFilterBuilderImpl : public BloomFilterBuilder {
public:
explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
WriterProperties properties)
: schema_(schema), properties_(std::move(properties)) {}
const WriterProperties* properties)
: schema_(schema), properties_(properties) {}
/// Append a new row group to host all incoming bloom filters.
void AppendRowGroup() override;

BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;

/// Serialize all bloom filters with header and bitset in the order of row group and
/// column id. Column encryption is not implemented yet. The side effect is that it
/// deletes all bloom filters after they have been flushed.
/// column id. The side effect is that it deletes all bloom filters after they have
/// been flushed.
void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) override;

BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
Expand All @@ -70,7 +71,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
}

const SchemaDescriptor* schema_;
WriterProperties properties_;
const WriterProperties* properties_;
bool finished_ = false;

using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
Expand All @@ -81,7 +82,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
};

std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
const SchemaDescriptor* schema, const WriterProperties& properties) {
const SchemaDescriptor* schema, const WriterProperties* properties) {
return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
}

Expand All @@ -97,7 +98,7 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin
CheckState(column_ordinal);
const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
auto bloom_filter_options_opt = properties_.bloom_filter_options(column_descr->path());
auto bloom_filter_options_opt = properties_->bloom_filter_options(column_descr->path());
if (bloom_filter_options_opt == std::nullopt) {
return nullptr;
}
Expand All @@ -106,12 +107,12 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin
auto iter = row_group_bloom_filter.find(column_ordinal);
if (iter == row_group_bloom_filter.end()) {
auto block_split_bloom_filter =
std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
bloom_filter_options.ndv, bloom_filter_options.fpp));
auto insert_result = row_group_bloom_filter.emplace(
column_ordinal, std::move(block_split_bloom_filter));
ARROW_CHECK(insert_result.second);
DCHECK(insert_result.second);
iter = insert_result.first;
}
ARROW_CHECK(iter->second != nullptr);
Expand Down
18 changes: 8 additions & 10 deletions cpp/src/parquet/bloom_filter_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ class PARQUET_EXPORT BloomFilterBuilder {
public:
/// \brief API to create a BloomFilterBuilder.
static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema,
const WriterProperties& properties);
const WriterProperties* properties);

/// Append a new row group to host all incoming bloom filters.
///
/// This method must be called before `GetOrCreateBloomFilter`
/// in a row group.
/// This method must be called before `GetOrCreateBloomFilter` for a new row group.
///
/// \throws ParquetException It will throw an exception if the BloomFilter already
/// called `WriteTo`.
/// \throws ParquetException if WriteTo() has been called to flush bloom filters.
virtual void AppendRowGroup() = 0;

/// \brief Get the BloomFilter from column ordinal.
Expand All @@ -63,9 +61,10 @@ class PARQUET_EXPORT BloomFilterBuilder {
/// BloomFilterBuilder. It will return nullptr if bloom filter is not enabled for the
/// column.
///
/// \throws ParquetException It will throw an exception if the BloomFilter already
/// called `WriteTo`, column_ordinal is out of bound, or without calling
/// `AppendRowGroup` before `GetOrCreateBloomFilter`.
/// \throws ParquetException if any of following conditions applies:
/// 1) column_ordinal is out of bound.
/// 2) `WriteTo()` has been called already.
/// 3) `AppendRowGroup()` is not called before `GetOrCreateBloomFilter()`.
virtual BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) = 0;

/// \brief Write the bloom filter to sink.
Expand All @@ -75,8 +74,7 @@ class PARQUET_EXPORT BloomFilterBuilder {
/// \param[out] sink The output stream to write the bloom filter.
/// \param[out] location The location of all bloom filter relative to the start of sink.
///
/// \throws ParquetException It will throw an exception if the BloomFilter already
/// called `WriteTo`.
/// \throws ParquetException if WriteTo() has been called to flush bloom filters.
virtual void WriteTo(::arrow::io::OutputStream* sink,
BloomFilterLocation* location) = 0;

Expand Down
27 changes: 20 additions & 7 deletions cpp/src/parquet/bloom_filter_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ TEST(BloomFilterBuilderTest, BasicRoundTrip) {
bloom_filter_options.ndv = 100;
properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
auto writer_properties = properties_builder.build();
auto builder = BloomFilterBuilder::Make(&schema, *writer_properties);
auto builder = BloomFilterBuilder::Make(&schema, writer_properties.get());

auto append_values_to_bloom_filter = [&](const std::vector<uint64_t>& insert_hashes) {
builder->AppendRowGroup();
Expand Down Expand Up @@ -164,15 +164,25 @@ TEST(BloomFilterBuilderTest, InvalidOperations) {
properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2");
auto properties = properties_builder.build();
auto builder = BloomFilterBuilder::Make(&schema, *properties);
auto builder = BloomFilterBuilder::Make(&schema, properties.get());
// AppendRowGroup() is not called and expect throw.
ASSERT_THROW(builder->GetOrCreateBloomFilter(0), ParquetException);
EXPECT_THROW_THAT(
[&]() { builder->GetOrCreateBloomFilter(0); }, ParquetException,
::testing::Property(
&ParquetException::what,
::testing::HasSubstr("No row group appended to BloomFilterBuilder")));

builder->AppendRowGroup();
// GetOrCreateBloomFilter() with wrong column ordinal expect throw.
ASSERT_THROW(builder->GetOrCreateBloomFilter(2), ParquetException);
EXPECT_THROW_THAT([&]() { builder->GetOrCreateBloomFilter(2); }, ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Invalid column ordinal")));
// GetOrCreateBloomFilter() with boolean expect throw.
ASSERT_THROW(builder->GetOrCreateBloomFilter(1), ParquetException);
EXPECT_THROW_THAT(
[&]() { builder->GetOrCreateBloomFilter(1); }, ParquetException,
::testing::Property(
&ParquetException::what,
::testing::HasSubstr("BloomFilterBuilder does not support boolean type")));
auto filter = builder->GetOrCreateBloomFilter(0);
// Call GetOrCreateBloomFilter the second time it is actually a cached version.
EXPECT_EQ(filter, builder->GetOrCreateBloomFilter(0));
Expand All @@ -181,7 +191,10 @@ TEST(BloomFilterBuilderTest, InvalidOperations) {
builder->WriteTo(sink.get(), &location);
EXPECT_EQ(1, location.bloom_filter_location.size());
// Multiple WriteTo() expect throw.
ASSERT_THROW(builder->WriteTo(sink.get(), &location), ParquetException);
EXPECT_THROW_THAT(
[&]() { builder->WriteTo(sink.get(), &location); }, ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Cannot call WriteTo() multiple times")));
}

TEST(BloomFilterBuilderTest, GetOrCreate) {
Expand All @@ -195,7 +208,7 @@ TEST(BloomFilterBuilderTest, GetOrCreate) {
properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2");
auto properties = properties_builder.build();
auto builder = BloomFilterBuilder::Make(&schema, *properties);
auto builder = BloomFilterBuilder::Make(&schema, properties.get());
// AppendRowGroup() is not called and expect throw.
ASSERT_THROW(builder->GetOrCreateBloomFilter(0), ParquetException);

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,6 @@ TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
wp_builder.disable_dictionary();
wp_builder.encoding(column_properties.encoding());
}
wp_builder.max_statistics_size(column_properties.max_statistics_size());
auto path = this->schema_.Column(0)->path();
if (column_properties.bloom_filter_enabled()) {
wp_builder.enable_bloom_filter_options(
Expand All @@ -1750,7 +1749,7 @@ TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
std::unique_ptr<PageWriter> pager = PageWriter::Open(
this->sink_, column_properties.compression(), this->metadata_.get());
builder_ = BloomFilterBuilder::Make(&this->schema_, *this->writer_properties_);
builder_ = BloomFilterBuilder::Make(&this->schema_, this->writer_properties_.get());
// Initial RowGroup
builder_->AppendRowGroup();
bloom_filter_ = builder_->GetOrCreateBloomFilter(0);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
if (properties_->file_encryption_properties()) {
ParquetException::NYI("Encryption is not supported with bloom filter");
}
// Serialize page index after all row groups have been written and report
// Serialize bloom filter after all row groups have been written and report
// location to the file metadata.
BloomFilterLocation bloom_filter_location;
bloom_filter_builder_->WriteTo(sink_.get(), &bloom_filter_location);
Expand Down Expand Up @@ -533,7 +533,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
}
if (properties_->bloom_filter_enabled()) {
bloom_filter_builder_ = BloomFilterBuilder::Make(schema(), *properties_);
bloom_filter_builder_ = BloomFilterBuilder::Make(schema(), properties_.get());
}
if (properties_->page_index_enabled()) {
page_index_builder_ = PageIndexBuilder::Make(&schema_, file_encryptor_.get());
Expand Down
4 changes: 0 additions & 4 deletions cpp/src/parquet/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ class ArrowWriterPropertiesBuilder;

class BloomFilter;

namespace schema {
class ColumnPath;
} // namespace schema

namespace arrow {

class FileWriter;
Expand Down

0 comments on commit bb8d4a5

Please sign in to comment.