Skip to content

Commit

Permalink
feat: Make PartitionedOutput reclaimable
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Dec 17, 2024
1 parent 929affe commit fc9d92b
Show file tree
Hide file tree
Showing 20 changed files with 1,702 additions and 161 deletions.
96 changes: 48 additions & 48 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,44 @@

namespace facebook::velox {

// A read-only file. All methods in this object should be thread safe.
/// A read-only file. All methods in this object should be thread safe.
class ReadFile {
public:
virtual ~ReadFile() = default;

// Reads the data at [offset, offset + length) into the provided pre-allocated
// buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'.
//
// This method should be thread safe.
/// Reads the data at [offset, offset + length) into the provided
/// pre-allocated buffer 'buf'. The bytes are returned as a string_view
/// pointing to 'buf'.
///
/// This method should be thread safe.
virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const = 0;

// Same as above, but returns owned data directly.
//
// This method should be thread safe.
/// Same as above, but returns owned data directly.
///
/// This method should be thread safe.
virtual std::string pread(uint64_t offset, uint64_t length) const;

// Reads starting at 'offset' into the memory referenced by the
// Ranges in 'buffers'. The buffers are filled left to right. A
// buffer with nullptr data will cause its size worth of bytes to be skipped.
//
// This method should be thread safe.
/// Reads starting at 'offset' into the memory referenced by the
/// Ranges in 'buffers'. The buffers are filled left to right. A
/// buffer with nullptr data will cause its size worth of bytes to be skipped.
///
/// This method should be thread safe.
virtual uint64_t preadv(
uint64_t /*offset*/,
const std::vector<folly::Range<char*>>& /*buffers*/) const;

// Vectorized read API. Implementations can coalesce and parallelize.
// The offsets don't need to be sorted.
// `iobufs` is a range of IOBufs to store the read data. They
// will be stored in the same order as the input `regions` vector. So the
// array must be pre-allocated by the caller, with the same size as `regions`,
// but don't need to be initialized, since each iobuf will be copy-constructed
// by the preadv.
// Returns the total number of bytes read, which might be different than the
// sum of all buffer sizes (for example, if coalescing was used).
//
// This method should be thread safe.
/// Vectorized read API. Implementations can coalesce and parallelize.
/// The offsets don't need to be sorted.
/// `iobufs` is a range of IOBufs to store the read data. They
/// will be stored in the same order as the input `regions` vector. So the
/// array must be pre-allocated by the caller, with the same size as
/// `regions`, but don't need to be initialized, since each iobuf will be
/// copy-constructed by the preadv. Returns the total number of bytes read,
/// which might be different than the sum of all buffer sizes (for example, if
/// coalescing was used).
///
/// This method should be thread safe.
virtual uint64_t preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const;
Expand All @@ -98,25 +99,25 @@ class ReadFile {
}
}

// Returns true if preadvAsync has a native implementation that is
// asynchronous. The default implementation is synchronous.
/// Returns true if preadvAsync has a native implementation that is
/// asynchronous. The default implementation is synchronous.
virtual bool hasPreadvAsync() const {
return false;
}

// Whether preads should be coalesced where possible. E.g. remote disk would
// set to true, in-memory to false.
/// Whether preads should be coalesced where possible. E.g. remote disk would
/// set to true, in-memory to false.
virtual bool shouldCoalesce() const = 0;

// Number of bytes in the file.
/// Number of bytes in the file.
virtual uint64_t size() const = 0;

// An estimate for the total amount of memory *this uses.
/// An estimate for the total amount of memory *this uses.
virtual uint64_t memoryUsage() const = 0;

// The total number of bytes *this had been used to read since creation or
// the last resetBytesRead. We sum all the |length| variables passed to
// preads, not the actual amount of bytes read (which might be less).
/// The total number of bytes *this had been used to read since creation or
/// the last resetBytesRead. We sum all the |length| variables passed to
/// preads, not the actual amount of bytes read (which might be less).
virtual uint64_t bytesRead() const {
return bytesRead_;
}
Expand All @@ -135,8 +136,8 @@ class ReadFile {
mutable std::atomic<uint64_t> bytesRead_ = 0;
};

// A write-only file. Nothing written to the file should be read back until it
// is closed.
/// A write-only file. Nothing written to the file should be read back until it
/// is closed.
class WriteFile {
public:
virtual ~WriteFile() = default;
Expand Down Expand Up @@ -193,14 +194,13 @@ class WriteFile {
virtual uint64_t size() const = 0;
};

// We currently do a simple implementation for the in-memory files
// that simply resizes a string as needed. If there ever gets used in
// a performance sensitive path we'd probably want to move to a Cord-like
// implementation for underlying storage.

// We don't provide registration functions for the in-memory files, as they
// aren't intended for any robust use needing a filesystem.

/// We currently do a simple implementation for the in-memory files
/// that simply resizes a string as needed. If there ever gets used in
/// a performance sensitive path we'd probably want to move to a Cord-like
/// implementation for underlying storage.
///
/// We don't provide registration functions for the in-memory files, as they
/// aren't intended for any robust use needing a filesystem.
class InMemoryReadFile : public ReadFile {
public:
explicit InMemoryReadFile(std::string_view file) : file_(file) {}
Expand Down Expand Up @@ -307,18 +307,18 @@ class LocalReadFile final : public ReadFile {
class LocalWriteFile final : public WriteFile {
public:
struct Attributes {
// If set to true, the file will not be subject to copy-on-write updates.
// This flag has an effect only on filesystems that support copy-on-write
// semantics, such as Btrfs.
/// If set to true, the file will not be subject to copy-on-write updates.
/// This flag has an effect only on filesystems that support copy-on-write
/// semantics, such as Btrfs.
static constexpr std::string_view kNoCow{"write-on-copy-disabled"};
static constexpr bool kDefaultNoCow{false};

static bool cowDisabled(
const std::unordered_map<std::string, std::string>& attrs);
};

// An error is thrown is a file already exists at |path|,
// unless flag shouldThrowOnFileAlreadyExists is false
/// An error is thrown is a file already exists at |path|,
/// unless flag shouldThrowOnFileAlreadyExists is false
explicit LocalWriteFile(
std::string_view path,
bool shouldCreateParentDirectories = false,
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
current_);
allocatedBytes_ += current_->size;
if (allocatedBytes_ <= 0) {
VELOX_CHECK_GT(allocatedBytes_, 0);
}
VELOX_CHECK_GT(allocatedBytes_, 0);
if (isBits_) {
// size and position are in units of bits for a bits stream.
Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/StreamArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void StreamArena::newTinyRange(
range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
range->size = bytes;
}

void StreamArena::clear() {
allocations_.clear();
pool_->freeNonContiguous(allocation_);
Expand Down
4 changes: 4 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,10 @@ class PartitionedOutputNode : public PlanNode {
return sources_;
}

bool canSpill(const QueryConfig& queryConfig) const override {
return isPartitioned() && queryConfig.partitionedOutputSpillEnabled();
}

const RowTypePtr& inputType() const {
return sources_[0]->outputType();
}
Expand Down
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ class QueryConfig {
static constexpr const char* kTopNRowNumberSpillEnabled =
"topn_row_number_spill_enabled";

/// PartitionedOutput spilling flag, only applies if "spill_enabled" flag is
/// set.
static constexpr const char* kPartitionedOutputSpillEnabled =
"partitioned_output_spill_enabled";

/// The max row numbers to fill and spill for each spill run. This is used to
/// cap the memory used for spilling. If it is zero, then there is no limit
/// and spilling might run out of memory.
Expand Down Expand Up @@ -656,6 +661,10 @@ class QueryConfig {
return get<bool>(kTopNRowNumberSpillEnabled, true);
}

bool partitionedOutputSpillEnabled() const {
return get<bool>(kPartitionedOutputSpillEnabled, true);
}

int32_t maxSpillLevel() const {
return get<int32_t>(kMaxSpillLevel, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
this->queryConfig_.testingOverrideConfigUnsafe(std::move(values));
}

// Overrides the previous connector-specific configuration. Note that this
// function is NOT thread-safe and should probably only be used in tests.
/// Overrides the previous connector-specific configuration. Note that this
/// function is NOT thread-safe and should probably only be used in tests.
void setConnectorSessionOverridesUnsafe(
const std::string& connectorId,
std::unordered_map<std::string, std::string>&& configOverrides) {
Expand Down
39 changes: 2 additions & 37 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,43 +108,8 @@ velox::memory::MemoryPool* DriverCtx::addOperatorPool(

std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
int32_t operatorId) const {
const auto& queryConfig = task->queryCtx()->queryConfig();
if (!queryConfig.spillEnabled()) {
return std::nullopt;
}
if (task->spillDirectory().empty() && !task->hasCreateSpillDirectoryCb()) {
return std::nullopt;
}
common::GetSpillDirectoryPathCB getSpillDirPathCb =
[this]() -> std::string_view {
return task->getOrCreateSpillDirectory();
};
const auto& spillFilePrefix =
fmt::format("{}_{}_{}", pipelineId, driverId, operatorId);
common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb =
[this](uint64_t bytes) {
task->queryCtx()->updateSpilledBytesAndCheckLimit(bytes);
};
return common::SpillConfig(
std::move(getSpillDirPathCb),
std::move(updateAndCheckSpillLimitCb),
spillFilePrefix,
queryConfig.maxSpillFileSize(),
queryConfig.spillWriteBufferSize(),
queryConfig.spillReadBufferSize(),
task->queryCtx()->spillExecutor(),
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
queryConfig.spillStartPartitionBit(),
queryConfig.spillNumPartitionBits(),
queryConfig.maxSpillLevel(),
queryConfig.maxSpillRunRows(),
queryConfig.writerFlushThresholdBytes(),
queryConfig.spillCompressionKind(),
queryConfig.spillPrefixSortEnabled()
? std::optional<common::PrefixSortConfig>(prefixSortConfig())
: std::nullopt,
queryConfig.spillFileCreateConfig());
return task->makeSpillConfig(
fmt::format("{}_{}_{}", pipelineId, driverId, operatorId));
}

std::atomic_uint64_t BlockingState::numBlockedDrivers_{0};
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SerializedPage {
// Buffers containing the serialized data. The memory is owned by 'iobuf_'.
std::vector<ByteRange> ranges_;

// IOBuf holding the data in 'ranges_.
// IOBuf holding the data in 'ranges_'.
std::unique_ptr<folly::IOBuf> iobuf_;

// Number of payload bytes in 'iobuf_'.
Expand Down
14 changes: 9 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::postReclaimCheck(int64_t reclaimedBytes) const {
VELOX_CHECK_GE(
reclaimedBytes,
0,
"Unexpected memory growth after reclaim from operator memory pool {}",
pool()->name());
}

void Operator::close() {
input_ = nullptr;
results_.clear();
Expand Down Expand Up @@ -750,11 +758,7 @@ uint64_t Operator::MemoryReclaimer::reclaim(
memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes);
op_->reclaim(targetBytes, stats);
}
VELOX_CHECK_GE(
reclaimedBytes,
0,
"Unexpected memory growth after reclaim from operator memory pool {}",
pool->name());
op_->postReclaimCheck(reclaimedBytes);
return reclaimedBytes;
},
stats);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ class Operator : public BaseRuntimeStatWriter {
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {}

virtual void postReclaimCheck(int64_t reclaimedBytes) const;

const core::PlanNodeId& planNodeId() const {
return operatorCtx_->planNodeId();
}
Expand Down
Loading

0 comments on commit fc9d92b

Please sign in to comment.