diff --git a/velox/common/base/CMakeLists.txt b/velox/common/base/CMakeLists.txt index cfbfdc90ea669..62a4679e1e582 100644 --- a/velox/common/base/CMakeLists.txt +++ b/velox/common/base/CMakeLists.txt @@ -29,6 +29,7 @@ velox_add_library( BitUtil.cpp Counters.cpp Fs.cpp + IOUtils.cpp PeriodicStatsReporter.cpp RandomUtil.cpp RawVector.cpp diff --git a/velox/common/base/IOUtils.cpp b/velox/common/base/IOUtils.cpp new file mode 100644 index 0000000000000..df8e333fe2276 --- /dev/null +++ b/velox/common/base/IOUtils.cpp @@ -0,0 +1,33 @@ +/* +* Copyright (c) Facebook, Inc. and its affiliates. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "velox/common/base/IOUtils.h" +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox::common { + +std::vector writableByteRangesFromIOBuf(folly::IOBuf* iobuf) { + std::vector byteRanges; + auto* current = iobuf; + do { + byteRanges.push_back( + {current->writableData(), (int32_t)current->length(), 0}); + current = current->next(); + } while (current != iobuf); + return byteRanges; +} + +} // namespace facebook::velox::common diff --git a/velox/common/base/IOUtils.h b/velox/common/base/IOUtils.h index 281c97ed36f71..a83726ec0c59d 100644 --- a/velox/common/base/IOUtils.h +++ b/velox/common/base/IOUtils.h @@ -18,7 +18,16 @@ #include #include +#include + +namespace facebook::velox { +struct ByteRange; +} + namespace facebook::velox::common { + +std::vector writableByteRangesFromIOBuf(folly::IOBuf* iobuf); + struct OutputByteStream { explicit OutputByteStream(char* data, int32_t offset = 0) : data_(data), offset_{offset} {} diff --git a/velox/common/base/tests/CMakeLists.txt b/velox/common/base/tests/CMakeLists.txt index 9a10ec270b86d..469ff70265af4 100644 --- a/velox/common/base/tests/CMakeLists.txt +++ b/velox/common/base/tests/CMakeLists.txt @@ -22,6 +22,7 @@ add_executable( ConcurrentCounterTest.cpp ExceptionTest.cpp FsTest.cpp + IOUtilsTest.cpp RangeTest.cpp RawVectorTest.cpp RuntimeMetricsTest.cpp diff --git a/velox/common/base/tests/IOUtilsTest.cpp b/velox/common/base/tests/IOUtilsTest.cpp new file mode 100644 index 0000000000000..d877aedc519f2 --- /dev/null +++ b/velox/common/base/tests/IOUtilsTest.cpp @@ -0,0 +1,55 @@ +/* +* Copyright (c) Facebook, Inc. and its affiliates. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include + +#include "velox/common/base/IOUtils.h" +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox::common { + +class IOUtilsTest : public testing::Test { + public: + std::unique_ptr createIOBuf(uint64_t size) { + auto buf = folly::IOBuf::create(size); + auto* writableData = buf->writableData(); + std::memset(writableData, '6', size); + buf->append(size); + return buf; + } +}; + +TEST_F(IOUtilsTest, iobufConsume) { + const uint64_t bufCapacity = 1024; + const uint64_t numChainedBuf = 64; + + auto head = createIOBuf(bufCapacity); + uint32_t count{1}; + folly::IOBuf* cur = head.get(); + while (count < numChainedBuf) { + cur->insertAfterThisOne(createIOBuf(bufCapacity)); + cur = cur->next(); + count++; + } + + auto bufList = writableByteRangesFromIOBuf(head.get()); + ASSERT_EQ(bufList.size(), numChainedBuf); + for (const auto& buf : bufList) { + ASSERT_EQ(buf.size, bufCapacity); + } +} + +} // namespace facebook::velox::common diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 4036e47912667..a5d881dc6d11e 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -20,6 +20,7 @@ #include #include "velox/common/base/Crc.h" +#include "velox/common/base/IOUtils.h" #include "velox/common/base/RawVector.h" #include "velox/common/memory/ByteStream.h" #include "velox/vector/BiasVector.h" @@ -4220,12 +4221,12 @@ void PrestoVectorSerde::deserialize( auto compressBuf = folly::IOBuf::create(header.compressedSize); source->readBytes(compressBuf->writableData(), header.compressedSize); compressBuf->append(header.compressedSize); + + // Process chained uncompressed results IOBufs. auto uncompress = codec->uncompress(compressBuf.get(), header.uncompressedSize); - ByteRange byteRange{ - uncompress->writableData(), (int32_t)uncompress->length(), 0}; - auto uncompressedSource = - std::make_unique(std::vector{byteRange}); + auto uncompressedSource = std::make_unique( + common::writableByteRangesFromIOBuf(uncompress.get())); readTopColumns( *uncompressedSource, type, pool, *result, resultOffset, prestoOptions); } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 9f4817a00c76b..d195ca5f79a29 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -775,6 +775,15 @@ TEST_P(PrestoSerializerTest, basic) { testRoundTrip(rowVector); } +TEST_P(PrestoSerializerTest, basicLarge) { + const vector_size_t numRows = 80'000; + auto rowVector = makeRowVector( + {makeFlatVector(numRows, [](vector_size_t row) { return row; }), + makeFlatVector( + numRows, [](vector_size_t row) { return std::string(1024, 'x'); })}); + testRoundTrip(rowVector); +} + /// Test serialization of a dictionary vector that adds nulls to the base /// vector. TEST_P(PrestoSerializerTest, dictionaryWithExtraNulls) {