Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Nov 30, 2023
1 parent b96d687 commit a981427
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 61 deletions.
15 changes: 4 additions & 11 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,11 @@ void OrderBy::reclaim(
// non-reclaimable execution section.
if (noMoreInput_) {
sortBuffer_->spillOutput();
// TODO: reduce the log frequency if it is too verbose.
LOG(WARNING)
<< "Can't reclaim from order by operator which has started producing output: "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
return;
} else {
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();
}

// TODO: support fine-grain disk spilling based on 'targetBytes' after having
// row container memory compaction support later.
sortBuffer_->spill();
// Release the minimum reserved memory.
pool()->release();
}
Expand Down
66 changes: 28 additions & 38 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,6 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_NE(type_, Type::kHashJoinProbe);

// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
markAllPartitionsSpilled();

fillSpillRuns(startRowIter);
Expand All @@ -495,11 +493,9 @@ void Spiller::spill(std::vector<char*> rows) {
return;
}

// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
markAllPartitionsSpilled();

fillSpillRuns(rows);
fillSinglePartition(rows);
runSpill();
checkEmptySpillRuns();
}
Expand Down Expand Up @@ -568,33 +564,6 @@ void Spiller::finalizeSpill() {
finalized_ = true;
}

void Spiller::fillSpillRuns(
std::vector<char*>& rows,
int32_t numRows,
std::vector<uint64_t>& hashes,
bool isSinglePartition) {
// Calculate hashes for this batch of spill candidates.
auto rowSet = folly::Range<char**>(rows.data(), numRows);

if (!isSinglePartition) {
for (auto i = 0; i < container_->keyTypes().size(); ++i) {
container_->hash(i, rowSet, i > 0, hashes.data());
}
}

// Put each in its run.
for (auto i = 0; i < numRows; ++i) {
// TODO: consider to cache the hash bits in row container so we only
// need to calculate them once.
const auto partition = isSinglePartition
? 0
: bits_.partition(hashes[i], state_.maxPartitions());
VELOX_DCHECK_GE(partition, 0);
spillRuns_[partition].rows.push_back(rows[i]);
spillRuns_[partition].numBytes += container_->rowSize(rows[i]);
}
}

void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
checkEmptySpillRuns();

Expand All @@ -613,7 +582,26 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
for (;;) {
auto numRows = container_->listRows(
&iterator, rows.size(), RowContainer::kUnlimited, rows.data());
fillSpillRuns(rows, numRows, hashes, isSinglePartition);
// Calculate hashes for this batch of spill candidates.
auto rowSet = folly::Range<char**>(rows.data(), numRows);

if (!isSinglePartition) {
for (auto i = 0; i < container_->keyTypes().size(); ++i) {
container_->hash(i, rowSet, i > 0, hashes.data());
}
}

// Put each in its run.
for (auto i = 0; i < numRows; ++i) {
// TODO: consider to cache the hash bits in row container so we only
// need to calculate them once.
const auto partition = isSinglePartition
? 0
: bits_.partition(hashes[i], state_.maxPartitions());
VELOX_DCHECK_GE(partition, 0);
spillRuns_[partition].rows.push_back(rows[i]);
spillRuns_[partition].numBytes += container_->rowSize(rows[i]);
}
if (numRows == 0) {
break;
}
Expand All @@ -622,15 +610,17 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
updateSpillFillTime(execTimeUs);
}

void Spiller::fillSpillRuns(std::vector<char*>& rows) {
void Spiller::fillSinglePartition(std::vector<char*>& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeUs{0};
{
MicrosecondTimer timer(&execTimeUs);
auto numRows = rows.size();
std::vector<uint64_t> hashes(numRows);
const bool isSinglePartition = bits_.numPartitions() == 1;
fillSpillRuns(rows, numRows, hashes, isSinglePartition);
spillRuns_[0].rows =
SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator());
for (auto row : rows) {
spillRuns_[0].numBytes += container_->rowSize(row);
}
}
updateSpillFillTime(execTimeUs);
}
Expand Down
19 changes: 7 additions & 12 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ class Spiller {
/// The caller needs to erase them from the row container.
void spill(const RowContainerIterator& startRowIter);

/// Invoked to spill.Spill all rows pointed by the pointers in sortedRows.This
/// is only used by 'kOrderByOutput' spiller type to spill during the order by
/// Invoked to spill all the rows pointed by rows. This is used by
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container.The caller needs to erase them from the row container.
void spill(std::vector<char*> sortedRows);
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*> rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
Expand Down Expand Up @@ -283,22 +283,17 @@ class Spiller {

void checkEmptySpillRuns() const;

/// Marks all the partitions have been spilled as we don't support
/// fine-grained spilling as for now.
void markAllPartitionsSpilled();

// Prepares spill runs for the spillable data from all the hash partitions.
// If 'startRowIter' is not null, we prepare runs starting from the offset
// pointed by 'startRowIter'.
void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr);

/// Prepares spill runs for the spillable data from all the hash partitions.
void fillSpillRuns(std::vector<char*>& rows);

/// Prepares spill runs for the spillable data from the rows.
void fillSpillRuns(
std::vector<char*>& rows,
int32_t numRows,
std::vector<uint64_t>& hashes,
bool isSinglePartition);
void fillSinglePartition(std::vector<char*>& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill();
Expand Down

0 comments on commit a981427

Please sign in to comment.