diff --git a/velox/common/encode/Coding.h b/velox/common/encode/Coding.h index b7e691342352f..284c226da901d 100644 --- a/velox/common/encode/Coding.h +++ b/velox/common/encode/Coding.h @@ -264,7 +264,7 @@ class Varint { // Zig-zag encoding that maps signed integers with a small absolute value // to unsigned integers with a small (positive) value. // if x >= 0, ZigZag::encode(x) == 2*x -// if x < 0, ZigZag::encode(x) == -2*x + 1 +// if x < 0, ZigZag::encode(x) == -2*x + 1 class ZigZag { public: static uint64_t encode(int64_t val) { @@ -273,6 +273,10 @@ class ZigZag { return (static_cast(val) << 1) ^ (val >> 63); } + static __uint128_t encodeInt128(__int128_t val) { + return (static_cast<__uint128_t>(val) << 1) ^ (val >> 127); + } + template ::type> static T decode(U val) { return static_cast((val >> 1) ^ -(val & 1)); diff --git a/velox/dwio/dwrf/common/IntEncoder.h b/velox/dwio/dwrf/common/IntEncoder.h index 1865028384784..e9be2822532ea 100644 --- a/velox/dwio/dwrf/common/IntEncoder.h +++ b/velox/dwio/dwrf/common/IntEncoder.h @@ -117,6 +117,18 @@ class IntEncoder { } } + void writeHugeInt(int128_t value) { + if (!useVInts_) { + writeHugeIntLE(value); + } else { + if constexpr (isSigned) { + writeVsHugeInt(value); + } else { + writeVuHugeInt(value); + } + } + } + /** * Get size of buffer used so far. */ @@ -180,6 +192,12 @@ class IntEncoder { } FOLLY_ALWAYS_INLINE void writeLongLE(int64_t val); + FOLLY_ALWAYS_INLINE void writeVuHugeInt(uint128_t val); + FOLLY_ALWAYS_INLINE void writeVsHugeInt(int128_t val) { + writeVuHugeInt(ZigZag::encodeInt128(val)); + } + FOLLY_ALWAYS_INLINE void writeHugeIntLE(int128_t val); + private: template uint64_t @@ -369,4 +387,25 @@ void IntEncoder::writeLongLE(int64_t val) { } } +template +void IntEncoder::writeVuHugeInt(uint128_t val) { + while (true) { + if ((val & ~0x7f) == 0) { + writeByte(static_cast(val)); + return; + } + writeByte(static_cast(0x80 | (val & dwio::common::BASE_128_MASK))); + // Cast val to unsigned so as to force 0-fill right shift. + val >>= 7; + } +} + +template +void IntEncoder::writeHugeIntLE(int128_t val) { + for (auto i = 0; i < numBytes_; i++) { + writeByte(static_cast(val & dwio::common::BASE_256_MASK)); + val >>= 8; + } +} + } // namespace facebook::velox::dwrf diff --git a/velox/dwio/dwrf/test/ColumnWriterTest.cpp b/velox/dwio/dwrf/test/ColumnWriterTest.cpp index d03d7dc98ba08..4097cc64a44b7 100644 --- a/velox/dwio/dwrf/test/ColumnWriterTest.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterTest.cpp @@ -203,7 +203,8 @@ constexpr uint32_t ITERATIONS = 100'000; template VectorPtr populateBatch( std::vector> const& data, - MemoryPool* pool) { + MemoryPool* pool, + const TypePtr& type = CppToType::create()) { BufferPtr values = AlignedBuffer::allocate(data.size(), pool); auto valuesPtr = values->asMutableRange(); @@ -223,12 +224,7 @@ VectorPtr populateBatch( } auto batch = std::make_shared>( - pool, - CppToType::create(), - nulls, - data.size(), - values, - std::vector{}); + pool, type, nulls, data.size(), values, std::vector{}); batch->setNullCount(nullCount); return batch; } @@ -329,7 +325,7 @@ void testDataTypeWriter( // write auto writer = BaseColumnWriter::create(context, *dataTypeWithId, sequence); auto size = data.size(); - auto batch = populateBatch(data, pool.get()); + auto batch = populateBatch(data, pool.get(), type); const size_t stripeCount = 2; const size_t strideCount = 3; @@ -445,6 +441,28 @@ TEST_F(ColumnWriterTest, TestNullBooleanWriter) { testDataTypeWriter(BOOLEAN(), data); } +TEST_F(ColumnWriterTest, testDecimalWriter) { + std::vector> shortDecimals; + for (auto i = 0; i < ITERATIONS; ++i) { + if (i % 15) { + shortDecimals.emplace_back(i); + } else { + shortDecimals.emplace_back(std::nullopt); + } + } + testDataTypeWriter(DECIMAL(10, 2), shortDecimals); + + std::vector> longDecimals; + for (auto i = 0; i < ITERATIONS; ++i) { + if (i % 15) { + longDecimals.emplace_back(HugeInt::build(123 * i, 345 * i + 678)); + } else { + longDecimals.emplace_back(std::nullopt); + } + } + testDataTypeWriter(DECIMAL(38, 2), longDecimals); +} + TEST_F(ColumnWriterTest, TestTimestampEpochWriter) { std::vector> data; // This value will be corrupted. verified in verifyValue method. diff --git a/velox/dwio/dwrf/writer/ColumnWriter.cpp b/velox/dwio/dwrf/writer/ColumnWriter.cpp index 425f9b5df289c..57b3084ac100e 100644 --- a/velox/dwio/dwrf/writer/ColumnWriter.cpp +++ b/velox/dwio/dwrf/writer/ColumnWriter.cpp @@ -697,6 +697,89 @@ class TimestampColumnWriter : public BaseColumnWriter { std::unique_ptr> nanos_; }; +class DecimalColumnWriter : public BaseColumnWriter { + public: + DecimalColumnWriter( + WriterContext& context, + const TypeWithId& type, + uint32_t sequence, + std::function onRecordPosition) + : BaseColumnWriter{context, type, sequence, onRecordPosition}, + type_{type.type()}, + unscaledValues_{createDirectEncoder( + newStream(StreamKind::StreamKind_DATA), + getConfig(Config::USE_VINTS), + type_->isShortDecimal() ? LONG_BYTE_SIZE : 2 * LONG_BYTE_SIZE)}, + scales_{createRleEncoder( + RleVersion_1, + // DWRF's NANO_DATA has the same enum value as ORC's SECONDARY. + newStream(StreamKind::StreamKind_NANO_DATA), + context.getConfig(Config::USE_VINTS), + LONG_BYTE_SIZE)} { + reset(); + } + + uint64_t write(const VectorPtr& slice, const common::Ranges& ranges) + override { + VELOX_CHECK( + slice->type()->equivalent(*type_), + "Unexpected vector type: {}.", + slice->type()->toString()); + auto localDecoded = decode(slice, ranges); + auto& decodedVector = localDecoded.get(); + writeNulls(decodedVector, ranges); + + size_t count = 0; + auto [_, scale] = getDecimalPrecisionScale(*type_); + + for (auto& pos : ranges) { + if (!decodedVector.mayHaveNulls() || !decodedVector.isNullAt(pos)) { + if (type_->isShortDecimal()) { + auto val = decodedVector.valueAt(pos); + unscaledValues_->writeValue(val); + scales_->writeValue(scale); + } else { + auto val = decodedVector.valueAt(pos); + unscaledValues_->writeHugeInt(val); + scales_->writeValue(scale); + } + ++count; + } + } + + indexStatsBuilder_->increaseValueCount(count); + if (count != ranges.size()) { + indexStatsBuilder_->setHasNull(); + } + + const uint32_t decimalSize = + type_->isShortDecimal() ? 2 * LONG_BYTE_SIZE : 3 * LONG_BYTE_SIZE; + const auto rawSize = + count * decimalSize + (ranges.size() - count) * NULL_SIZE; + indexStatsBuilder_->increaseRawSize(rawSize); + return rawSize; + } + + void flush( + std::function encodingFactory, + std::function encodingOverride) override { + BaseColumnWriter::flush(encodingFactory, encodingOverride); + unscaledValues_->flush(); + scales_->flush(); + } + + void recordPosition() override { + BaseColumnWriter::recordPosition(); + unscaledValues_->recordPosition(*indexBuilder_); + scales_->recordPosition(*indexBuilder_); + } + + private: + TypePtr type_; + std::unique_ptr> unscaledValues_; + std::unique_ptr> scales_; +}; + namespace { FOLLY_ALWAYS_INLINE int64_t formatTime(int64_t seconds, uint64_t nanos) { DWIO_ENSURE(seconds >= MIN_SECONDS); @@ -2036,9 +2119,17 @@ std::unique_ptr BaseColumnWriter::create( case TypeKind::INTEGER: return std::make_unique>( context, type, sequence, onRecordPosition); - case TypeKind::BIGINT: + case TypeKind::BIGINT: { + if (type.type()->isDecimal()) { + return std::make_unique( + context, type, sequence, onRecordPosition); + } return std::make_unique>( context, type, sequence, onRecordPosition); + } + case TypeKind::HUGEINT: + return std::make_unique( + context, type, sequence, onRecordPosition); case TypeKind::REAL: return std::make_unique>( context, type, sequence, onRecordPosition);