Skip to content

Commit

Permalink
Fix multiple files spilled for the distinct hash table (facebookincub…
Browse files Browse the repository at this point in the history
…ator#9230)

Summary:
The existing distinct aggregation implementation assumes that there is one file generated
for each spill run. And use stream id 0 to detect if a row read from spilled file is distinct one
or not. This is no longer true after we add support to configure the max number of rows to
spill in each sorted spill file for aggregation which means stream id > 0 could also contains
the distinct values. This will cause incorrect data result and reported by [issue](facebookincubator#9219).

This PR fixes this issue by recording the number of spilled files on the first spill in grouping
set to detect the spilled files that contain the seen distinct values. Unit test is added to reproduce
and verify the fix. Also removed the unused spill config

Pull Request resolved: facebookincubator#9230

Reviewed By: oerling

Differential Revision: D55288249

Pulled By: xiaoxmeng

fbshipit-source-id: 0b96263ea3c08d8e5bd9e210f77547d642c2f2db
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 25, 2024
1 parent f5dbf1e commit 5c67de4
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 122 deletions.
2 changes: 0 additions & 2 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ SpillConfig::SpillConfig(
std::string _fileNamePrefix,
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _minSpillRunSize,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand All @@ -42,7 +41,6 @@ SpillConfig::SpillConfig(
_maxFileSize == 0 ? std::numeric_limits<int64_t>::max()
: _maxFileSize),
writeBufferSize(_writeBufferSize),
minSpillRunSize(_minSpillRunSize),
executor(_executor),
minSpillableReservationPct(_minSpillableReservationPct),
spillableReservationGrowthPct(_spillableReservationGrowthPct),
Expand Down
10 changes: 0 additions & 10 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ struct SpillConfig {
std::string _filePath,
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _minSpillRunSize,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand Down Expand Up @@ -94,15 +93,6 @@ struct SpillConfig {
/// storage system for io efficiency.
uint64_t writeBufferSize;

/// The min spill run size (bytes) limit used to select partitions for
/// spilling. The spiller tries to spill a previously spilled partitions if
/// its data size exceeds this limit, otherwise it spills the partition with
/// most data. If the limit is zero, then the spiller always spill a
/// previously spilled partition if it has any data. This is to avoid spill
/// from a partition with a small amount of data which might result in
/// generating too many small spilled files.
uint64_t minSpillRunSize;

/// Executor for spilling. If nullptr spilling writes on the Driver's thread.
folly::Executor* executor; // Not owned.

Expand Down
3 changes: 0 additions & 3 deletions velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ TEST(SpillConfig, spillLevel) {
"fakeSpillPath",
0,
0,
0,
nullptr,
0,
0,
Expand Down Expand Up @@ -116,7 +115,6 @@ TEST(SpillConfig, spillLevelLimit) {
"fakeSpillPath",
0,
0,
0,
nullptr,
0,
0,
Expand Down Expand Up @@ -163,7 +161,6 @@ TEST(SpillConfig, spillableReservationPercentages) {
"spillableReservationPercentages",
0,
0,
0,
nullptr,
testData.minPct,
testData.growthPct,
Expand Down
1 change: 0 additions & 1 deletion velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
"",
0,
0,
0,
spillExecutor_.get(),
10,
20,
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ class E2EWriterTest : public testing::Test {
"fakeSpillConfig",
0,
0,
0,
nullptr,
minSpillableReservationPct,
spillableReservationGrowthPct,
Expand Down
1 change: 0 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
spillFilePrefix,
queryConfig.maxSpillFileSize(),
queryConfig.spillWriteBufferSize(),
queryConfig.minSpillRunSize(),
task->queryCtx()->spillExecutor(),
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
Expand Down
23 changes: 16 additions & 7 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ void GroupingSet::spill() {
if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
VELOX_CHECK_EQ(numDistinctSpilledFiles_, 0);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kAggregateInput,
rows,
Expand All @@ -944,8 +945,13 @@ void GroupingSet::spill() {
std::vector<CompareFlags>(),
spillConfig_,
spillStats_);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
}
spiller_->spill();
if (isDistinct() && numDistinctSpilledFiles_ == 0) {
numDistinctSpilledFiles_ = spiller_->state().numFinishedFiles(0);
VELOX_CHECK_GT(numDistinctSpilledFiles_, 0);
}
if (sortedAggregations_) {
sortedAggregations_->clear();
}
Expand Down Expand Up @@ -1064,16 +1070,19 @@ bool GroupingSet::mergeNextWithoutAggregates(
const RowVectorPtr& result) {
VELOX_CHECK_NOT_NULL(merge_);
VELOX_CHECK(isDistinct());
VELOX_CHECK_GT(numDistinctSpilledFiles_, 0);

// We are looping over sorted rows produced by tree-of-losers. We logically
// split the stream into runs of duplicate rows. As we process each run we
// track whether one of the values comes from stream 0, in which case we
// should not produce a result from that run. Otherwise, we produce a result
// at the end of the run (when we know for sure whether the run contains a row
// from stream 0 or not).
// track whether one of the values coming from distinct streams, in which case
// we should not produce a result from that run. Otherwise, we produce a
// result at the end of the run (when we know for sure whether the run
// contains a row from the distinct streams).
//
// NOTE: stream 0 contains rows which has already been output as distinct
// before we trigger spilling.
// NOTE: the distinct stream refers to the stream that contains the spilled
// distinct hash table. A distinct stream contains rows which has already
// been output as distinct before we trigger spilling. A distinct stream id is
// less than 'numDistinctSpilledFiles_'.
bool newDistinct{true};
int32_t numOutputRows{0};
while (numOutputRows < maxOutputRows) {
Expand All @@ -1082,7 +1091,7 @@ bool GroupingSet::mergeNextWithoutAggregates(
if (stream == nullptr) {
break;
}
if (stream->id() == 0) {
if (stream->id() < numDistinctSpilledFiles_) {
newDistinct = false;
}
if (next.second) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ class GroupingSet {
bool remainingMayPushdown_;

std::unique_ptr<Spiller> spiller_;
// Sets to the number of files stores the spilled distinct hash table which
// are the files generated by the first spill call. This only applies for
// distinct hash aggregation.
size_t numDistinctSpilledFiles_{0};
std::unique_ptr<TreeOfLosers<SpillMergeStream>> merge_;

// Container for materializing batches of output from spilling.
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ void SpillState::finishFile(uint32_t partition) {
writer->finishFile();
}

size_t SpillState::numFinishedFiles(uint32_t partition) const {
if (!isPartitionSpilled(partition)) {
return 0;
}
const auto* writer = partitionWriter(partition);
if (writer == nullptr) {
return 0;
}
return writer->numFinishedFiles();
}

SpillFiles SpillState::finish(uint32_t partition) {
auto* writer = partitionWriter(partition);
if (writer == nullptr) {
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ class SpillState {
/// far.
void finishFile(uint32_t partition);

/// Returns the current number of finished files from a given partition.
///
/// NOTE: the fucntion returns zero if the state has finished or the partition
/// is not spilled yet.
size_t numFinishedFiles(uint32_t partition) const;

/// Returns the spill file objects from a given 'partition'. The function
/// returns an empty list if either the partition has not been spilled or has
/// no spilled data.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ void SpillWriter::closeFile() {
currentFile_.reset();
}

size_t SpillWriter::numFinishedFiles() const {
return finishedFiles_.size();
}

uint64_t SpillWriter::flush() {
if (batch_ == nullptr) {
return 0;
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/SpillFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class SpillWriter {
/// start a new one.
void finishFile();

/// Returns the number of current finished files.
size_t numFinishedFiles() const;

/// Finishes this file writer and returns the written spill files info.
///
/// NOTE: we don't allow write to a spill writer after t
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ DEFINE_bool(

namespace facebook::velox::exec::test {

int32_t AggregationFuzzerBase::randInt(int32_t min, int32_t max) {
return boost::random::uniform_int_distribution<int32_t>(min, max)(rng_);
}

bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const {
// Date / IntervalDayTime/ Unknown are not currently supported by DWRF.
if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) {
Expand Down Expand Up @@ -403,8 +407,10 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true");
spillPct = 100;
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(core::QueryConfig::kMaxSpillRunRows, randInt(32, 1L << 30));
// Randomized the spill injection with a percentage less than 100.
spillPct = 20;
}

if (abandonPartial) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class AggregationFuzzerBase {
AggregationFuzzerBase::ReferenceQueryErrorCode errorCode);
};

int32_t randInt(int32_t min, int32_t max);

bool addSignature(
const std::string& name,
const FunctionSignaturePtr& signature);
Expand Down
Loading

0 comments on commit 5c67de4

Please sign in to comment.