diff --git a/velox/common/encode/Coding.h b/velox/common/encode/Coding.h index b7e691342352..284c226da901 100644 --- a/velox/common/encode/Coding.h +++ b/velox/common/encode/Coding.h @@ -264,7 +264,7 @@ class Varint { // Zig-zag encoding that maps signed integers with a small absolute value // to unsigned integers with a small (positive) value. // if x >= 0, ZigZag::encode(x) == 2*x -// if x < 0, ZigZag::encode(x) == -2*x + 1 +// if x < 0, ZigZag::encode(x) == -2*x + 1 class ZigZag { public: static uint64_t encode(int64_t val) { @@ -273,6 +273,10 @@ class ZigZag { return (static_cast(val) << 1) ^ (val >> 63); } + static __uint128_t encodeInt128(__int128_t val) { + return (static_cast<__uint128_t>(val) << 1) ^ (val >> 127); + } + template ::type> static T decode(U val) { return static_cast((val >> 1) ^ -(val & 1)); diff --git a/velox/common/encode/tests/CMakeLists.txt b/velox/common/encode/tests/CMakeLists.txt index 90c9733ecf22..9127554b688a 100644 --- a/velox/common/encode/tests/CMakeLists.txt +++ b/velox/common/encode/tests/CMakeLists.txt @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_common_encode_test Base64Test.cpp) +add_executable(velox_common_encode_test Base64Test.cpp ZigZagTest.cpp) add_test(velox_common_encode_test velox_common_encode_test) target_link_libraries( velox_common_encode_test diff --git a/velox/common/encode/tests/ZigZagTest.cpp b/velox/common/encode/tests/ZigZagTest.cpp new file mode 100644 index 000000000000..a69d6326056d --- /dev/null +++ b/velox/common/encode/tests/ZigZagTest.cpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/encode/Coding.h" +#include "velox/type/HugeInt.h" + +namespace facebook::velox::encode::test { + +class ZigZagTest : public ::testing::Test {}; + +TEST_F(ZigZagTest, hugeInt) { + auto assertZigZag = [](int128_t value) { + auto encoded = ZigZag::encodeInt128(value); + auto decoded = ZigZag::decode(encoded); + EXPECT_EQ(value, decoded); + }; + + assertZigZag(0); + assertZigZag(HugeInt::parse("1234567890123456789")); + assertZigZag(HugeInt::parse("-1234567890123456789")); + assertZigZag(HugeInt::parse(std::string(38, '9'))); + assertZigZag(std::numeric_limits<__int128_t>::max()); + assertZigZag(std::numeric_limits<__int128_t>::min()); +} + +} // namespace facebook::velox::encode::test diff --git a/velox/dwio/dwrf/common/IntEncoder.h b/velox/dwio/dwrf/common/IntEncoder.h index 186502838478..1051967bb39b 100644 --- a/velox/dwio/dwrf/common/IntEncoder.h +++ b/velox/dwio/dwrf/common/IntEncoder.h @@ -117,6 +117,21 @@ class IntEncoder { } } + /// Write a huge int value at a time. This method could be optimized to encode + /// a batch of values at a time and avoid condition checking for each write + /// following 'addImpl'. + virtual void writeHugeInt(int128_t value) { + if (!useVInts_) { + VELOX_NYI("Huge int encoding not supported for fixed length."); + } else { + if constexpr (isSigned) { + writeVsHugeInt(value); + } else { + writeVuHugeInt(value); + } + } + } + /** * Get size of buffer used so far. */ @@ -180,6 +195,12 @@ class IntEncoder { } FOLLY_ALWAYS_INLINE void writeLongLE(int64_t val); + FOLLY_ALWAYS_INLINE void writeVuHugeInt(uint128_t val); + + FOLLY_ALWAYS_INLINE void writeVsHugeInt(int128_t val) { + writeVuHugeInt(ZigZag::encodeInt128(val)); + } + private: template uint64_t @@ -369,4 +390,17 @@ void IntEncoder::writeLongLE(int64_t val) { } } +template +void IntEncoder::writeVuHugeInt(uint128_t val) { + while (true) { + if ((val & ~0x7f) == 0) { + writeByte(static_cast(val)); + return; + } + writeByte(static_cast(0x80 | (val & dwio::common::BASE_128_MASK))); + // Cast val to unsigned so as to force 0-fill right shift. + val >>= 7; + } +} + } // namespace facebook::velox::dwrf diff --git a/velox/dwio/dwrf/test/ColumnWriterTest.cpp b/velox/dwio/dwrf/test/ColumnWriterTest.cpp index d03d7dc98ba0..050070451fe7 100644 --- a/velox/dwio/dwrf/test/ColumnWriterTest.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterTest.cpp @@ -203,10 +203,22 @@ constexpr uint32_t ITERATIONS = 100'000; template VectorPtr populateBatch( std::vector> const& data, - MemoryPool* pool) { + MemoryPool* pool, + const TypePtr& type = CppToType::create()) { BufferPtr values = AlignedBuffer::allocate(data.size(), pool); auto valuesPtr = values->asMutableRange(); + const size_t nulloptCount = + std::count(data.begin(), data.end(), std::nullopt); + if (nulloptCount == 0) { + size_t index = 0; + for (auto val : data) { + valuesPtr[index++] = val.value(); + } + return std::make_shared>( + pool, type, nullptr, data.size(), values, std::vector{}); + } + BufferPtr nulls = allocateNulls(data.size(), pool); auto* nullsPtr = nulls->asMutable(); size_t index = 0; @@ -223,12 +235,7 @@ VectorPtr populateBatch( } auto batch = std::make_shared>( - pool, - CppToType::create(), - nulls, - data.size(), - values, - std::vector{}); + pool, type, nulls, data.size(), values, std::vector{}); batch->setNullCount(nullCount); return batch; } @@ -278,8 +285,14 @@ void verifyBatch( const uint32_t seed) { auto size = data.size(); ASSERT_EQ(out->size(), size) << "Batch size mismatch with seed " << seed; - ASSERT_EQ(nullCount, out->getNullCount()) - << "nullCount mismatch with seed " << seed; + if (nullCount == std::nullopt) { + const auto outNullCount = out->getNullCount(); + ASSERT_TRUE(outNullCount == std::nullopt || outNullCount == 0) + << "nullCount mismatch with seed " << seed; + } else { + ASSERT_EQ(nullCount, out->getNullCount()) + << "nullCount mismatch with seed " << seed; + } auto outFv = std::dynamic_pointer_cast>(out); size_t index = 0; @@ -314,7 +327,8 @@ template void testDataTypeWriter( const TypePtr& type, std::vector>& data, - const uint32_t sequence = 0) { + const uint32_t sequence = 0, + DwrfFormat format = DwrfFormat::kDwrf) { // Generate a seed and randomly shuffle the data uint32_t seed = Random::rand32(); std::shuffle(data.begin(), data.end(), std::default_random_engine(seed)); @@ -327,9 +341,10 @@ void testDataTypeWriter( auto dataTypeWithId = TypeWithId::create(type, 1); // write - auto writer = BaseColumnWriter::create(context, *dataTypeWithId, sequence); + auto writer = BaseColumnWriter::create( + context, *dataTypeWithId, sequence, nullptr, format); auto size = data.size(); - auto batch = populateBatch(data, pool.get()); + auto batch = populateBatch(data, pool.get(), type); const size_t stripeCount = 2; const size_t strideCount = 3; @@ -445,6 +460,43 @@ TEST_F(ColumnWriterTest, TestNullBooleanWriter) { testDataTypeWriter(BOOLEAN(), data); } +TEST_F(ColumnWriterTest, testDecimalWriter) { + const auto format = DwrfFormat::kOrc; + auto genShortDecimals = [&](bool hasNull) { + std::vector> shortDecimals; + for (auto i = 0; i < ITERATIONS; ++i) { + if (!hasNull || i % 15) { + shortDecimals.emplace_back(i); + } else { + shortDecimals.emplace_back(std::nullopt); + } + } + return shortDecimals; + }; + + auto shortValues = genShortDecimals(false); + testDataTypeWriter(DECIMAL(10, 2), shortValues, 0 /*sequence*/, format); + shortValues = genShortDecimals(true); + testDataTypeWriter(DECIMAL(10, 2), shortValues, 0 /*sequence*/, format); + + auto genLongDecimals = [&](bool hasNull) { + std::vector> longDecimals; + for (auto i = 0; i < ITERATIONS; ++i) { + if (!hasNull || i % 15) { + longDecimals.emplace_back(HugeInt::build(123 * i, 456 * i + 789)); + } else { + longDecimals.emplace_back(std::nullopt); + } + } + return longDecimals; + }; + + auto longValues = genLongDecimals(false); + testDataTypeWriter(DECIMAL(38, 4), longValues, 0 /*sequence*/, format); + longValues = genLongDecimals(true); + testDataTypeWriter(DECIMAL(38, 4), longValues, 0 /*sequence*/, format); +} + TEST_F(ColumnWriterTest, TestTimestampEpochWriter) { std::vector> data; // This value will be corrupted. verified in verifyValue method. diff --git a/velox/dwio/dwrf/test/TestIntDirect.cpp b/velox/dwio/dwrf/test/TestIntDirect.cpp index 9b7e9e425086..2736c52f49b5 100644 --- a/velox/dwio/dwrf/test/TestIntDirect.cpp +++ b/velox/dwio/dwrf/test/TestIntDirect.cpp @@ -135,6 +135,37 @@ void testInts(std::function generator) { } } +// Test round-trip writing and reading of huge ints. Only vInts are supported. +template +void testHugeInts(const std::vector& vec) { + auto pool = memory::memoryManager()->addLeafPool(); + const size_t count = vec.size(); + + const size_t capacity = count * folly::kMaxVarintLength64 * 2; + MemorySink sink{capacity, {.pool = pool.get()}}; + DataBufferHolder holder{*pool, capacity, 0, DEFAULT_PAGE_GROW_RATIO, &sink}; + auto output = std::make_unique(holder); + auto encoder = createDirectEncoder( + std::move(output), true /*useVInts*/, sizeof(int128_t)); + + for (size_t i = 0; i < count; ++i) { + encoder->writeHugeInt(vec[i]); + } + encoder->flush(); + + const size_t expectedSize = capacity; + auto input = std::make_unique( + sink.data(), expectedSize, expectedSize); + auto decoder = std::make_unique>( + std::move(input), true /*useVInts*/, sizeof(int128_t)); + + std::vector vals(count); + decoder->nextValues(vals.data(), count, nullptr); + for (size_t i = 0; i < count; ++i) { + ASSERT_EQ(vec[i], vals[i]); + } +} + class DirectTest : public testing::Test { protected: static void SetUpTestCase() { @@ -229,3 +260,15 @@ TEST_F(DirectTest, corruptedInts) { testCorruptedVarInts(); testCorruptedVarInts(); } + +TEST_F(DirectTest, hugeInts) { + folly::Random::DefaultGenerator rng; + std::vector vec; + constexpr size_t count = 10240; + for (auto i = 0; i < count; ++i) { + vec.emplace_back( + HugeInt::build(folly::Random::rand64(), folly::Random::rand64())); + } + testHugeInts(vec); + testHugeInts(vec); +} diff --git a/velox/dwio/dwrf/writer/ColumnWriter.cpp b/velox/dwio/dwrf/writer/ColumnWriter.cpp index 425f9b5df289..c0a2db118a4b 100644 --- a/velox/dwio/dwrf/writer/ColumnWriter.cpp +++ b/velox/dwio/dwrf/writer/ColumnWriter.cpp @@ -697,6 +697,118 @@ class TimestampColumnWriter : public BaseColumnWriter { std::unique_ptr> nanos_; }; +class DecimalColumnWriter : public BaseColumnWriter { + public: + DecimalColumnWriter( + WriterContext& context, + const TypeWithId& type, + uint32_t sequence, + std::function onRecordPosition) + : BaseColumnWriter{context, type, sequence, std::move(onRecordPosition)}, + type_{type.type()}, + scale_{getDecimalPrecisionScale(*type_).second}, + isShortDecimal_{type_->isShortDecimal()}, + unscaledValues_{createDirectEncoder( + newStream(StreamKind::StreamKind_DATA), + // IntDecoder and IntEncoder only support vInts for huge ints. + isShortDecimal_ ? getConfig(Config::USE_VINTS) : true /*useVInts*/, + isShortDecimal_ ? LONG_BYTE_SIZE : 2 * LONG_BYTE_SIZE)}, + scales_{createRleEncoder( + RleVersion_1, + // DWRF's NANO_DATA has the same enum value as ORC's SECONDARY. + newStream(StreamKind::StreamKind_NANO_DATA), + getConfig(Config::USE_VINTS), + LONG_BYTE_SIZE)} { + reset(); + } + + uint64_t write(const VectorPtr& slice, const common::Ranges& ranges) + override { + VELOX_CHECK( + slice->type()->equivalent(*type_), + "Unexpected vector type: {}.", + slice->type()->toString()); + + // Always decode to reduce the number of branches. Fast paths for flat and + // constant encodings can be added for further optimization. + auto localDecoded = decode(slice, ranges); + auto& decodedVector = localDecoded.get(); + writeNulls(decodedVector, ranges); + + size_t count = 0; + if (isShortDecimal_) { + if (!decodedVector.mayHaveNulls()) { + for (auto pos : ranges) { + const auto val = decodedVector.valueAt(pos); + unscaledValues_->writeValue(val); + scales_->writeValue(scale_); + } + count += ranges.size(); + } else { + for (auto pos : ranges) { + if (!decodedVector.isNullAt(pos)) { + const auto val = decodedVector.valueAt(pos); + unscaledValues_->writeValue(val); + scales_->writeValue(scale_); + ++count; + } + } + } + } else { + if (!decodedVector.mayHaveNulls()) { + for (auto pos : ranges) { + const auto val = decodedVector.valueAt(pos); + unscaledValues_->writeHugeInt(val); + scales_->writeValue(scale_); + } + count += ranges.size(); + } else { + for (auto pos : ranges) { + if (!decodedVector.isNullAt(pos)) { + const auto val = decodedVector.valueAt(pos); + unscaledValues_->writeHugeInt(val); + scales_->writeValue(scale_); + ++count; + } + } + } + } + + indexStatsBuilder_->increaseValueCount(count); + if (count != ranges.size()) { + indexStatsBuilder_->setHasNull(); + } + + const uint32_t decimalSize = + isShortDecimal_ ? 2 * LONG_BYTE_SIZE : 3 * LONG_BYTE_SIZE; + const auto rawSize = + count * decimalSize + (ranges.size() - count) * NULL_SIZE; + indexStatsBuilder_->increaseRawSize(rawSize); + return rawSize; + } + + void flush( + std::function encodingFactory, + std::function encodingOverride) override { + BaseColumnWriter::flush(encodingFactory, encodingOverride); + unscaledValues_->flush(); + scales_->flush(); + } + + void recordPosition() override { + BaseColumnWriter::recordPosition(); + unscaledValues_->recordPosition(*indexBuilder_); + scales_->recordPosition(*indexBuilder_); + } + + private: + const TypePtr type_; + const uint8_t scale_; + const bool isShortDecimal_; + std::unique_ptr> unscaledValues_; + std::unique_ptr> scales_; +}; + namespace { FOLLY_ALWAYS_INLINE int64_t formatTime(int64_t seconds, uint64_t nanos) { DWIO_ENSURE(seconds >= MIN_SECONDS); @@ -1986,7 +2098,8 @@ std::unique_ptr BaseColumnWriter::create( WriterContext& context, const TypeWithId& type, uint32_t sequence, - std::function onRecordPosition) { + std::function onRecordPosition, + DwrfFormat format) { const auto flatMapEnabled = context.getConfig(Config::FLATTEN_MAP) && type.parent() != nullptr && (type.parent()->id() == 0); bool isFlatMapColumn{false}; @@ -2036,9 +2149,20 @@ std::unique_ptr BaseColumnWriter::create( case TypeKind::INTEGER: return std::make_unique>( context, type, sequence, onRecordPosition); - case TypeKind::BIGINT: + case TypeKind::BIGINT: { + if (format == DwrfFormat::kOrc && type.type()->isDecimal()) { + return std::make_unique( + context, type, sequence, onRecordPosition); + } return std::make_unique>( context, type, sequence, onRecordPosition); + } + case TypeKind::HUGEINT: { + if (format == DwrfFormat::kOrc) { + return std::make_unique( + context, type, sequence, onRecordPosition); + } + } case TypeKind::REAL: return std::make_unique>( context, type, sequence, onRecordPosition); diff --git a/velox/dwio/dwrf/writer/ColumnWriter.h b/velox/dwio/dwrf/writer/ColumnWriter.h index b404893d6515..98c5691babb9 100644 --- a/velox/dwio/dwrf/writer/ColumnWriter.h +++ b/velox/dwio/dwrf/writer/ColumnWriter.h @@ -151,7 +151,8 @@ class BaseColumnWriter : public ColumnWriter { WriterContext& context, const dwio::common::TypeWithId& type, const uint32_t sequence = 0, - std::function onRecordPosition = nullptr); + std::function onRecordPosition = nullptr, + DwrfFormat format = DwrfFormat::kDwrf); protected: BaseColumnWriter( diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10f..b96e8c7261c0 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -44,6 +44,7 @@ struct WriterOptions : public dwio::common::WriterOptions { columnWriterFactory; const tz::TimeZone* sessionTimezone{nullptr}; bool adjustTimestampToTimezone{false}; + DwrfFormat format{DwrfFormat::kDwrf}; }; class Writer : public dwio::common::Writer {