-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-6863][VL] Pre-alloc and reuse compress buffer to avoid OOM in spill #6869
Changes from 6 commits
b0500bf
c27e0f9
1c0146a
2a1385c
e34ce56
89b902c
3d2f472
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -186,45 +186,45 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers( | |
std::vector<std::shared_ptr<arrow::Buffer>> buffers, | ||
const std::vector<bool>* isValidityBuffer, | ||
arrow::MemoryPool* pool, | ||
arrow::util::Codec* codec) { | ||
arrow::util::Codec* codec, | ||
std::shared_ptr<arrow::Buffer> compressed) { | ||
if (payloadType == Payload::Type::kCompressed) { | ||
Timer compressionTime; | ||
compressionTime.start(); | ||
// Compress. | ||
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... | ||
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); | ||
int64_t totalCompressedLength = | ||
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { | ||
if (!buffer) { | ||
return sum; | ||
} | ||
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); | ||
}); | ||
const auto maxCompressedLength = metadataLength + totalCompressedLength; | ||
ARROW_ASSIGN_OR_RAISE( | ||
std::shared_ptr<arrow::ResizableBuffer> compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool)); | ||
|
||
auto output = compressed->mutable_data(); | ||
auto maxLength = maxCompressedLength(buffers, codec); | ||
std::shared_ptr<arrow::Buffer> compressedBuffer; | ||
uint8_t* output; | ||
if (compressed) { | ||
ARROW_RETURN_IF( | ||
compressed->size() < maxLength, | ||
arrow::Status::Invalid( | ||
"Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")")); | ||
output = const_cast<uint8_t*>(compressed->data()); | ||
} else { | ||
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reuse the buffer for uncompressed payload type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We hold the original evicted buffer for uncompressed payload. There are no extra copy. |
||
output = compressedBuffer->mutable_data(); | ||
} | ||
|
||
int64_t actualLength = 0; | ||
// Compress buffers one by one. | ||
for (auto& buffer : buffers) { | ||
auto availableLength = maxCompressedLength - actualLength; | ||
auto availableLength = maxLength - actualLength; | ||
// Release buffer after compression. | ||
ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec)); | ||
output += compressedSize; | ||
actualLength += compressedSize; | ||
} | ||
|
||
ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound.")); | ||
RETURN_NOT_OK(compressed->Resize(actualLength)); | ||
if (compressed) { | ||
compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(), actualLength); | ||
} else { | ||
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength)); | ||
} | ||
compressionTime.stop(); | ||
auto payload = std::unique_ptr<BlockPayload>(new BlockPayload( | ||
Type::kCompressed, | ||
numRows, | ||
std::vector<std::shared_ptr<arrow::Buffer>>{compressed}, | ||
isValidityBuffer, | ||
pool, | ||
codec)); | ||
auto payload = std::unique_ptr<BlockPayload>( | ||
new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec)); | ||
payload->setCompressionTime(compressionTime.realTimeUsed()); | ||
return payload; | ||
} | ||
|
@@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() { | |
return getBufferSize(buffers_); | ||
} | ||
|
||
int64_t BlockPayload::maxCompressedLength( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move it to anonymous namespace? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a public api for BlockPayload and is used by other components. |
||
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers, | ||
arrow::util::Codec* codec) { | ||
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... | ||
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); | ||
int64_t totalCompressedLength = | ||
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { | ||
if (!buffer) { | ||
return sum; | ||
} | ||
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); | ||
}); | ||
return metadataLength + totalCompressedLength; | ||
} | ||
|
||
arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge( | ||
std::unique_ptr<InMemoryPayload> source, | ||
std::unique_ptr<InMemoryPayload> append, | ||
|
@@ -404,9 +419,13 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge( | |
return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, std::move(merged)); | ||
} | ||
|
||
arrow::Result<std::unique_ptr<BlockPayload>> | ||
InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { | ||
return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); | ||
arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload( | ||
Payload::Type payloadType, | ||
arrow::MemoryPool* pool, | ||
arrow::util::Codec* codec, | ||
std::shared_ptr<arrow::Buffer> compressed) { | ||
return BlockPayload::fromBuffers( | ||
payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed)); | ||
} | ||
|
||
arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need
evictType
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hashEvict need this param to know whether the evict source is a spill or not. If it's spill, the partition writer will write the payload to disk immediately, otherwise it will cache the payload.