Skip to content

Commit

Permalink
Add decimal column writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 12, 2024
1 parent f3fdec3 commit 43d046a
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 10 deletions.
6 changes: 5 additions & 1 deletion velox/common/encode/Coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class Varint {
// Zig-zag encoding that maps signed integers with a small absolute value
// to unsigned integers with a small (positive) value.
// if x >= 0, ZigZag::encode(x) == 2*x
// if x < 0, ZigZag::encode(x) == -2*x + 1
// if x < 0, ZigZag::encode(x) == -2*x + 1
class ZigZag {
public:
static uint64_t encode(int64_t val) {
Expand All @@ -273,6 +273,10 @@ class ZigZag {
return (static_cast<uint64_t>(val) << 1) ^ (val >> 63);
}

static __uint128_t encodeInt128(__int128_t val) {
return (static_cast<__uint128_t>(val) << 1) ^ (val >> 127);
}

template <typename U, typename T = typename std::make_signed<U>::type>
static T decode(U val) {
return static_cast<T>((val >> 1) ^ -(val & 1));
Expand Down
39 changes: 39 additions & 0 deletions velox/dwio/dwrf/common/IntEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ class IntEncoder {
}
}

void writeHugeInt(int128_t value) {
if (!useVInts_) {
writeHugeIntLE(value);
} else {
if constexpr (isSigned) {
writeVsHugeInt(value);
} else {
writeVuHugeInt(value);
}
}
}

/**
* Get size of buffer used so far.
*/
Expand Down Expand Up @@ -180,6 +192,12 @@ class IntEncoder {
}
FOLLY_ALWAYS_INLINE void writeLongLE(int64_t val);

FOLLY_ALWAYS_INLINE void writeVuHugeInt(uint128_t val);
FOLLY_ALWAYS_INLINE void writeVsHugeInt(int128_t val) {
writeVuHugeInt(ZigZag::encodeInt128(val));
}
FOLLY_ALWAYS_INLINE void writeHugeIntLE(int128_t val);

private:
template <typename T>
uint64_t
Expand Down Expand Up @@ -369,4 +387,25 @@ void IntEncoder<isSigned>::writeLongLE(int64_t val) {
}
}

template <bool isSigned>
void IntEncoder<isSigned>::writeVuHugeInt(uint128_t val) {
while (true) {
if ((val & ~0x7f) == 0) {
writeByte(static_cast<char>(val));
return;
}
writeByte(static_cast<char>(0x80 | (val & dwio::common::BASE_128_MASK)));
// Cast val to unsigned so as to force 0-fill right shift.
val >>= 7;
}
}

template <bool isSigned>
void IntEncoder<isSigned>::writeHugeIntLE(int128_t val) {
for (auto i = 0; i < numBytes_; i++) {
writeByte(static_cast<char>(val & dwio::common::BASE_256_MASK));
val >>= 8;
}
}

} // namespace facebook::velox::dwrf
34 changes: 26 additions & 8 deletions velox/dwio/dwrf/test/ColumnWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ constexpr uint32_t ITERATIONS = 100'000;
template <typename T>
VectorPtr populateBatch(
std::vector<std::optional<T>> const& data,
MemoryPool* pool) {
MemoryPool* pool,
const TypePtr& type = CppToType<T>::create()) {
BufferPtr values = AlignedBuffer::allocate<T>(data.size(), pool);
auto valuesPtr = values->asMutableRange<T>();

Expand All @@ -223,12 +224,7 @@ VectorPtr populateBatch(
}

auto batch = std::make_shared<FlatVector<T>>(
pool,
CppToType<T>::create(),
nulls,
data.size(),
values,
std::vector<BufferPtr>{});
pool, type, nulls, data.size(), values, std::vector<BufferPtr>{});
batch->setNullCount(nullCount);
return batch;
}
Expand Down Expand Up @@ -329,7 +325,7 @@ void testDataTypeWriter(
// write
auto writer = BaseColumnWriter::create(context, *dataTypeWithId, sequence);
auto size = data.size();
auto batch = populateBatch(data, pool.get());
auto batch = populateBatch(data, pool.get(), type);
const size_t stripeCount = 2;
const size_t strideCount = 3;

Expand Down Expand Up @@ -445,6 +441,28 @@ TEST_F(ColumnWriterTest, TestNullBooleanWriter) {
testDataTypeWriter(BOOLEAN(), data);
}

TEST_F(ColumnWriterTest, testDecimalWriter) {
std::vector<std::optional<int64_t>> shortDecimals;
for (auto i = 0; i < ITERATIONS; ++i) {
if (i % 15) {
shortDecimals.emplace_back(i);
} else {
shortDecimals.emplace_back(std::nullopt);
}
}
testDataTypeWriter(DECIMAL(10, 2), shortDecimals);

std::vector<std::optional<int128_t>> longDecimals;
for (auto i = 0; i < ITERATIONS; ++i) {
if (i % 15) {
longDecimals.emplace_back(HugeInt::build(123 * i, 345 * i + 678));
} else {
longDecimals.emplace_back(std::nullopt);
}
}
testDataTypeWriter(DECIMAL(38, 2), longDecimals);
}

TEST_F(ColumnWriterTest, TestTimestampEpochWriter) {
std::vector<std::optional<Timestamp>> data;
// This value will be corrupted. verified in verifyValue method.
Expand Down
93 changes: 92 additions & 1 deletion velox/dwio/dwrf/writer/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,89 @@ class TimestampColumnWriter : public BaseColumnWriter {
std::unique_ptr<IntEncoder<false>> nanos_;
};

class DecimalColumnWriter : public BaseColumnWriter {
public:
DecimalColumnWriter(
WriterContext& context,
const TypeWithId& type,
uint32_t sequence,
std::function<void(IndexBuilder&)> onRecordPosition)
: BaseColumnWriter{context, type, sequence, onRecordPosition},
type_{type.type()},
unscaledValues_{createDirectEncoder</* isSigned = */ true>(
newStream(StreamKind::StreamKind_DATA),
getConfig(Config::USE_VINTS),
type_->isShortDecimal() ? LONG_BYTE_SIZE : 2 * LONG_BYTE_SIZE)},
scales_{createRleEncoder</* isSigned = */ true>(
RleVersion_1,
// DWRF's NANO_DATA has the same enum value as ORC's SECONDARY.
newStream(StreamKind::StreamKind_NANO_DATA),
context.getConfig(Config::USE_VINTS),
LONG_BYTE_SIZE)} {
reset();
}

uint64_t write(const VectorPtr& slice, const common::Ranges& ranges)
override {
VELOX_CHECK(
slice->type()->equivalent(*type_),
"Unexpected vector type: {}.",
slice->type()->toString());
auto localDecoded = decode(slice, ranges);
auto& decodedVector = localDecoded.get();
writeNulls(decodedVector, ranges);

size_t count = 0;
auto [_, scale] = getDecimalPrecisionScale(*type_);

for (auto& pos : ranges) {
if (!decodedVector.mayHaveNulls() || !decodedVector.isNullAt(pos)) {
if (type_->isShortDecimal()) {
auto val = decodedVector.valueAt<int64_t>(pos);
unscaledValues_->writeValue(val);
scales_->writeValue(scale);
} else {
auto val = decodedVector.valueAt<int128_t>(pos);
unscaledValues_->writeHugeInt(val);
scales_->writeValue(scale);
}
++count;
}
}

indexStatsBuilder_->increaseValueCount(count);
if (count != ranges.size()) {
indexStatsBuilder_->setHasNull();
}

const uint32_t decimalSize =
type_->isShortDecimal() ? 2 * LONG_BYTE_SIZE : 3 * LONG_BYTE_SIZE;
const auto rawSize =
count * decimalSize + (ranges.size() - count) * NULL_SIZE;
indexStatsBuilder_->increaseRawSize(rawSize);
return rawSize;
}

void flush(
std::function<proto::ColumnEncoding&(uint32_t)> encodingFactory,
std::function<void(proto::ColumnEncoding&)> encodingOverride) override {
BaseColumnWriter::flush(encodingFactory, encodingOverride);
unscaledValues_->flush();
scales_->flush();
}

void recordPosition() override {
BaseColumnWriter::recordPosition();
unscaledValues_->recordPosition(*indexBuilder_);
scales_->recordPosition(*indexBuilder_);
}

private:
TypePtr type_;
std::unique_ptr<IntEncoder<true>> unscaledValues_;
std::unique_ptr<IntEncoder<true>> scales_;
};

namespace {
FOLLY_ALWAYS_INLINE int64_t formatTime(int64_t seconds, uint64_t nanos) {
DWIO_ENSURE(seconds >= MIN_SECONDS);
Expand Down Expand Up @@ -2036,9 +2119,17 @@ std::unique_ptr<BaseColumnWriter> BaseColumnWriter::create(
case TypeKind::INTEGER:
return std::make_unique<IntegerColumnWriter<int32_t>>(
context, type, sequence, onRecordPosition);
case TypeKind::BIGINT:
case TypeKind::BIGINT: {
if (type.type()->isDecimal()) {
return std::make_unique<DecimalColumnWriter>(
context, type, sequence, onRecordPosition);
}
return std::make_unique<IntegerColumnWriter<int64_t>>(
context, type, sequence, onRecordPosition);
}
case TypeKind::HUGEINT:
return std::make_unique<DecimalColumnWriter>(
context, type, sequence, onRecordPosition);
case TypeKind::REAL:
return std::make_unique<FloatColumnWriter<float>>(
context, type, sequence, onRecordPosition);
Expand Down

0 comments on commit 43d046a

Please sign in to comment.