Skip to content

Commit

Permalink
testing flush policy
Browse files Browse the repository at this point in the history
  • Loading branch information
svm1 committed Apr 26, 2024
1 parent ecd3b4f commit 0e640eb
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 18 deletions.
37 changes: 19 additions & 18 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,47 +138,49 @@ TEST_F(ParquetWriterTest, compression) {

TEST_F(ParquetWriterTest, flushPolicyFactory) {
auto schema =
ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"},
ROW({"c0", "c1"},
{INTEGER(),
DOUBLE(),
BIGINT(),
INTEGER(),
BIGINT(),
INTEGER(),
DOUBLE()});
const int64_t kRows = 10'000;
int64_t kRows = 100;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return row + 5; }),
makeFlatVector<double>(kRows, [](auto row) { return row - 10; }),
makeFlatVector<int64_t>(kRows, [](auto row) { return row - 15; }),
makeFlatVector<uint32_t>(kRows, [](auto row) { return row + 20; }),
makeFlatVector<uint64_t>(kRows, [](auto row) { return row + 25; }),
makeFlatVector<int32_t>(kRows, [](auto row) { return row + 30; }),
makeFlatVector<double>(kRows, [](auto row) { return row - 25; }),
makeFlatVector<double>(kRows, [](auto row) { return row - 10; })
});

// Create an in-memory writer
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
512 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.flushPolicyFactory = [&]() {
return std::make_unique<LambdaFlushPolicy>(1'024 * 1'024, 128 * 1'024 * 1'024,[]() {
return true; // Flushes every batch.
return std::make_unique<LambdaFlushPolicy>(100, 128 * 1024 * 1024,[]() {
return false; // memory-based flush. expecting flush after writing 100 rows
});
};

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);

writer->write(data);
EXPECT_TRUE(writer->shouldFlush());

kRows = 99;
const auto data2 = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return row + 5; }),
makeFlatVector<double>(kRows, [](auto row) { return row - 10; })
});

writer->write(data2);
EXPECT_FALSE(writer->shouldFlush());

writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);

ASSERT_EQ(reader->numberOfRows(), kRows);
ASSERT_EQ(reader->numberOfRows(), 199);
ASSERT_EQ(*reader->rowType(), *schema);

auto rowReader = createRowReaderWithSchema(std::move(reader), schema);
Expand Down Expand Up @@ -213,7 +215,6 @@ TEST_F(ParquetWriterTest, encoding) {
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.encoding = facebook::velox::parquet::arrow::Encoding::BIT_PACKED;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ void Writer::abort() {
arrowContext_.reset();
}

bool Writer::shouldFlush() {
if (flushPolicy_->shouldFlush(getStripeProgress(
arrowContext_->stagingRows, arrowContext_->stagingBytes))) {
return true;
}
return false;
}

parquet::WriterOptions getParquetOptions(
const dwio::common::WriterOptions& options) {
parquet::WriterOptions parquetOptions;
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class Writer : public dwio::common::Writer {

void abort() override;

bool shouldFlush();

private:
// Sets the memory reclaimers for all the memory pools used by this writer.
void setMemoryReclaimers();
Expand Down

0 comments on commit 0e640eb

Please sign in to comment.