Skip to content

Commit

Permalink
Back out "Optimize PagedInputStream::Skip" (#6870)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #6870

Original commit changeset: 07241aaf71e8

Original Phabricator Diff: D49501856

Reviewed By: Yuhta

Differential Revision: D49872683

fbshipit-source-id: d33553ce7ff603caed5befcb8d12c67a0154fba3
  • Loading branch information
Prasoon Telang authored and facebook-github-bot committed Oct 3, 2023
1 parent 9db46c6 commit d08ab02
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 178 deletions.
58 changes: 19 additions & 39 deletions velox/dwio/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,32 +246,24 @@ class ZstdDecompressor : public Decompressor {
explicit ZstdDecompressor(
uint64_t blockSize,
const std::string& streamDebugInfo)
: Decompressor{blockSize, streamDebugInfo}, context_{ZSTD_createDCtx()} {}

~ZstdDecompressor() override {
ZSTD_freeDCtx(context_);
}
: Decompressor{blockSize, streamDebugInfo} {}

uint64_t decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) override;

std::pair<int64_t, bool> getDecompressedLength(
const char* src,
uint64_t srcLength) const override;

private:
ZSTD_DCtx* context_;
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
const override;
};

uint64_t ZstdDecompressor::decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) {
auto ret = ZSTD_decompressDCtx(context_, dest, destLength, src, srcLength);
auto ret = ZSTD_decompress(dest, destLength, src, srcLength);
DWIO_ENSURE(
!ZSTD_isError(ret),
"ZSTD returned an error: ",
Expand All @@ -281,22 +273,22 @@ uint64_t ZstdDecompressor::decompress(
return ret;
}

std::pair<int64_t, bool> ZstdDecompressor::getDecompressedLength(
uint64_t ZstdDecompressor::getUncompressedLength(
const char* src,
uint64_t srcLength) const {
auto uncompressedLength = ZSTD_getFrameContentSize(src, srcLength);
// in the case when decompression size is not available, return the upper
// bound
if (uncompressedLength == ZSTD_CONTENTSIZE_UNKNOWN ||
uncompressedLength == ZSTD_CONTENTSIZE_ERROR) {
return {blockSize_, false};
return blockSize_;
}
DWIO_ENSURE_LE(
uncompressedLength,
blockSize_,
"Insufficient buffer size. Info: ",
streamDebugInfo_);
return {uncompressedLength, true};
return uncompressedLength;
}

class SnappyDecompressor : public Decompressor {
Expand All @@ -312,17 +304,16 @@ class SnappyDecompressor : public Decompressor {
char* dest,
uint64_t destLength) override;

std::pair<int64_t, bool> getDecompressedLength(
const char* src,
uint64_t srcLength) const override;
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
const override;
};

uint64_t SnappyDecompressor::decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) {
auto [length, _] = getDecompressedLength(src, srcLength);
auto length = getUncompressedLength(src, srcLength);
DWIO_ENSURE_GE(destLength, length);
DWIO_ENSURE(
snappy::RawUncompress(src, srcLength, dest),
Expand All @@ -331,24 +322,23 @@ uint64_t SnappyDecompressor::decompress(
return length;
}

std::pair<int64_t, bool> SnappyDecompressor::getDecompressedLength(
uint64_t SnappyDecompressor::getUncompressedLength(
const char* src,
uint64_t srcLength) const {
size_t uncompressedLength;
// in the case when decompression size is not available, return the upper
// bound
if (!snappy::GetUncompressedLength(src, srcLength, &uncompressedLength)) {
return {blockSize_, false};
return blockSize_;
}
DWIO_ENSURE_LE(
uncompressedLength,
blockSize_,
"Insufficient buffer size. Info: ",
streamDebugInfo_);
return {uncompressedLength, true};
return uncompressedLength;
}

// TODO: Is this really needed?
class ZlibDecompressionStream : public PagedInputStream,
private ZlibDecompressor {
public:
Expand All @@ -365,18 +355,13 @@ class ZlibDecompressionStream : public PagedInputStream,
ZlibDecompressor{blockSize, windowBits, streamDebugInfo, isGzip} {}
~ZlibDecompressionStream() override = default;

bool readOrSkip(const void** data, int32_t* size) override;
bool Next(const void** data, int32_t* size) override;
};

bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
if (data) {
VELOX_CHECK_EQ(pendingSkip_, 0);
}
bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
// if the user pushed back, return them the partial buffer
if (outputBufferLength_) {
if (data) {
*data = outputBufferPtr_;
}
*data = outputBufferPtr_;
*size = static_cast<int32_t>(outputBufferLength_);
outputBufferPtr_ += outputBufferLength_;
bytesReturned_ += outputBufferLength_;
Expand All @@ -396,9 +381,7 @@ bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
static_cast<size_t>(inputBufferPtrEnd_ - inputBufferPtr_),
remainingLength_);
if (state_ == State::ORIGINAL) {
if (data) {
*data = inputBufferPtr_;
}
*data = inputBufferPtr_;
*size = static_cast<int32_t>(availSize);
outputBufferPtr_ = inputBufferPtr_ + availSize;
outputBufferLength_ = 0;
Expand All @@ -410,8 +393,7 @@ bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
getName(),
" Info: ",
ZlibDecompressor::streamDebugInfo_);
prepareOutputBuffer(
getDecompressedLength(inputBufferPtr_, availSize).first);
prepareOutputBuffer(getUncompressedLength(inputBufferPtr_, availSize));

reset();
zstream_.next_in =
Expand Down Expand Up @@ -450,9 +432,7 @@ bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
}
} while (result != Z_STREAM_END);
*size = static_cast<int32_t>(blockSize_ - zstream_.avail_out);
if (data) {
*data = outputBufferPtr_;
}
*data = outputBufferPtr_;
outputBufferLength_ = 0;
outputBufferPtr_ += *size;
}
Expand Down
12 changes: 6 additions & 6 deletions velox/dwio/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ class Compressor {
class Decompressor {
public:
explicit Decompressor(uint64_t blockSize, const std::string& streamDebugInfo)
: blockSize_{static_cast<int64_t>(blockSize)},
streamDebugInfo_{streamDebugInfo} {}
: blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {}

virtual ~Decompressor() = default;

virtual std::pair<int64_t, bool /* Is the size exact? */>
getDecompressedLength(const char* /* src */, uint64_t /* srcLength */) const {
return {blockSize_, false};
virtual uint64_t getUncompressedLength(
const char* /* unused */,
uint64_t /* unused */) const {
return blockSize_;
}

virtual uint64_t decompress(
Expand All @@ -63,7 +63,7 @@ class Decompressor {
uint64_t destLength) = 0;

protected:
int64_t blockSize_;
uint64_t blockSize_;
const std::string streamDebugInfo_;
};

Expand Down
90 changes: 26 additions & 64 deletions velox/dwio/common/compression/PagedInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,9 @@ const char* PagedInputStream::ensureInput(size_t availableInputBytes) {
}

bool PagedInputStream::Next(const void** data, int32_t* size) {
VELOX_CHECK_NOT_NULL(data);
skipAllPending();
return readOrSkip(data, size);
}

// Read into `data' if it is not null; otherwise skip some of the pending.
bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
if (data) {
VELOX_CHECK_EQ(pendingSkip_, 0);
}
// if the user pushed back, return them the partial buffer
if (outputBufferLength_) {
if (data) {
*data = outputBufferPtr_;
}
*data = outputBufferPtr_;
*size = static_cast<int32_t>(outputBufferLength_);
outputBufferPtr_ += outputBufferLength_;
bytesReturned_ += outputBufferLength_;
Expand Down Expand Up @@ -151,9 +139,7 @@ bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
// if no decompression or decryption is needed, simply adjust the output
// pointer. Otherwise, make sure we have continuous block
if (original) {
if (data) {
*data = inputBufferPtr_;
}
*data = inputBufferPtr_;
*size = static_cast<int32_t>(availSize);
outputBufferPtr_ = inputBufferPtr_ + availSize;
inputBufferPtr_ += availSize;
Expand All @@ -168,35 +154,24 @@ bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
decrypter_->decrypt(folly::StringPiece{input, remainingLength_});
input = reinterpret_cast<const char*>(decryptionBuffer_->data());
remainingLength_ = decryptionBuffer_->length();
if (data) {
*data = input;
}
*data = input;
*size = remainingLength_;
outputBufferPtr_ = input + remainingLength_;
}

// perform decompression
if (state_ == State::START) {
DWIO_ENSURE_NOT_NULL(decompressor_.get(), "invalid stream state");
DWIO_ENSURE_NOT_NULL(input);
auto [decompressedLength, exact] =
decompressor_->getDecompressedLength(input, remainingLength_);
if (!data && exact && decompressedLength <= pendingSkip_) {
*size = decompressedLength;
outputBufferPtr_ = nullptr;
} else {
prepareOutputBuffer(decompressedLength);
outputBufferLength_ = decompressor_->decompress(
input,
remainingLength_,
outputBuffer_->data(),
outputBuffer_->capacity());
if (data) {
*data = outputBuffer_->data();
}
*size = static_cast<int32_t>(outputBufferLength_);
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
}
prepareOutputBuffer(
decompressor_->getUncompressedLength(input, remainingLength_));
outputBufferLength_ = decompressor_->decompress(
input,
remainingLength_,
outputBuffer_->data(),
outputBuffer_->capacity());
*data = outputBuffer_->data();
*size = static_cast<int32_t>(outputBufferLength_);
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
// release decryption buffer
decryptionBuffer_ = nullptr;
}
Expand All @@ -213,14 +188,6 @@ bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
}

void PagedInputStream::BackUp(int32_t count) {
if (pendingSkip_ > 0) {
auto len = std::min<int64_t>(count, pendingSkip_);
pendingSkip_ -= len;
count -= len;
if (count == 0) {
return;
}
}
DWIO_ENSURE(
outputBufferPtr_ != nullptr,
"Backup without previous Next in ",
Expand All @@ -240,29 +207,25 @@ void PagedInputStream::BackUp(int32_t count) {
bytesReturned_ -= count;
}

bool PagedInputStream::skipAllPending() {
while (pendingSkip_ > 0) {
bool PagedInputStream::Skip(int32_t count) {
// this is a stupid implementation for now.
// should skip entire blocks without decompressing
while (count > 0) {
const void* ptr;
int32_t len;
if (!readOrSkip(nullptr, &len)) {
if (!Next(&ptr, &len)) {
return false;
}
if (len > pendingSkip_) {
auto toBackUp = len - pendingSkip_;
pendingSkip_ = 0;
BackUp(toBackUp);
if (len > count) {
BackUp(len - count);
count = 0;
} else {
pendingSkip_ -= len;
count -= len;
}
}
return true;
}

bool PagedInputStream::Skip(int32_t count) {
pendingSkip_ += count;
// We never use the return value of this function so this is OK.
return true;
}

void PagedInputStream::clearDecompressionState() {
state_ = State::HEADER;
outputBufferLength_ = 0;
Expand All @@ -280,8 +243,7 @@ void PagedInputStream::seekToPosition(
// to the beginning of the last view or last header, whichever is
// later. If we are returning views into the decompression buffer,
// we can backup to the beginning of the decompressed buffer
auto alreadyRead =
bytesReturned_ - bytesReturnedAtLastHeaderOffset_ + pendingSkip_;
auto alreadyRead = bytesReturned_ - bytesReturnedAtLastHeaderOffset_;

// outsideOriginalWindow is true if we are returning views into
// the input stream's buffer and we are seeking below the start of the last
Expand All @@ -298,12 +260,12 @@ void PagedInputStream::seekToPosition(
auto provider = dwio::common::PositionProvider(positions);
input_->seekToPosition(provider);
clearDecompressionState();
pendingSkip_ = uncompressedOffset;
Skip(uncompressedOffset);
} else {
if (uncompressedOffset < alreadyRead) {
BackUp(alreadyRead - uncompressedOffset);
} else {
pendingSkip_ += uncompressedOffset - alreadyRead;
Skip(uncompressedOffset - alreadyRead);
}
}
}
Expand Down
Loading

0 comments on commit d08ab02

Please sign in to comment.