Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add decimal column writer for ORC file format #11431

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion velox/common/encode/Coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class Varint {
// Zig-zag encoding that maps signed integers with a small absolute value
// to unsigned integers with a small (positive) value.
// if x >= 0, ZigZag::encode(x) == 2*x
// if x < 0, ZigZag::encode(x) == -2*x + 1
// if x < 0, ZigZag::encode(x) == -2*x + 1
class ZigZag {
public:
static uint64_t encode(int64_t val) {
Expand All @@ -273,6 +273,10 @@ class ZigZag {
return (static_cast<uint64_t>(val) << 1) ^ (val >> 63);
}

static __uint128_t encodeInt128(__int128_t val) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test for this? Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TEST_F(ZigZagTest, hugeInt). Thanks.

return (static_cast<__uint128_t>(val) << 1) ^ (val >> 127);
}

template <typename U, typename T = typename std::make_signed<U>::type>
static T decode(U val) {
return static_cast<T>((val >> 1) ^ -(val & 1));
Expand Down
2 changes: 1 addition & 1 deletion velox/common/encode/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_common_encode_test Base64Test.cpp)
add_executable(velox_common_encode_test Base64Test.cpp ZigZagTest.cpp)
add_test(velox_common_encode_test velox_common_encode_test)
target_link_libraries(
velox_common_encode_test
Expand Down
41 changes: 41 additions & 0 deletions velox/common/encode/tests/ZigZagTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/encode/Coding.h"
#include "velox/type/HugeInt.h"

namespace facebook::velox::encode::test {

class ZigZagTest : public ::testing::Test {};

TEST_F(ZigZagTest, hugeInt) {
auto assertZigZag = [](int128_t value) {
auto encoded = ZigZag::encodeInt128(value);
auto decoded = ZigZag::decode(encoded);
EXPECT_EQ(value, decoded);
};

assertZigZag(0);
assertZigZag(HugeInt::parse("1234567890123456789"));
assertZigZag(HugeInt::parse("-1234567890123456789"));
assertZigZag(HugeInt::parse(std::string(38, '9')));
assertZigZag(std::numeric_limits<__int128_t>::max());
assertZigZag(std::numeric_limits<__int128_t>::min());
}

} // namespace facebook::velox::encode::test
34 changes: 34 additions & 0 deletions velox/dwio/dwrf/common/IntEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ class IntEncoder {
}
}

/// Write a huge int value at a time. This method could be optimized to encode
/// a batch of values at a time and avoid condition checking for each write
/// following 'addImpl'.
virtual void writeHugeInt(int128_t value) {
if (!useVInts_) {
VELOX_NYI("Huge int encoding not supported for fixed length.");
} else {
if constexpr (isSigned) {
writeVsHugeInt(value);
} else {
writeVuHugeInt(value);
}
}
}

/**
* Get size of buffer used so far.
*/
Expand Down Expand Up @@ -180,6 +195,12 @@ class IntEncoder {
}
FOLLY_ALWAYS_INLINE void writeLongLE(int64_t val);

FOLLY_ALWAYS_INLINE void writeVuHugeInt(uint128_t val);

FOLLY_ALWAYS_INLINE void writeVsHugeInt(int128_t val) {
writeVuHugeInt(ZigZag::encodeInt128(val));
}

private:
template <typename T>
uint64_t
Expand Down Expand Up @@ -369,4 +390,17 @@ void IntEncoder<isSigned>::writeLongLE(int64_t val) {
}
}

template <bool isSigned>
void IntEncoder<isSigned>::writeVuHugeInt(uint128_t val) {
while (true) {
if ((val & ~0x7f) == 0) {
writeByte(static_cast<char>(val));
return;
}
writeByte(static_cast<char>(0x80 | (val & dwio::common::BASE_128_MASK)));
// Cast val to unsigned so as to force 0-fill right shift.
val >>= 7;
}
}

} // namespace facebook::velox::dwrf
76 changes: 64 additions & 12 deletions velox/dwio/dwrf/test/ColumnWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,22 @@ constexpr uint32_t ITERATIONS = 100'000;
template <typename T>
VectorPtr populateBatch(
std::vector<std::optional<T>> const& data,
MemoryPool* pool) {
MemoryPool* pool,
const TypePtr& type = CppToType<T>::create()) {
BufferPtr values = AlignedBuffer::allocate<T>(data.size(), pool);
auto valuesPtr = values->asMutableRange<T>();

const size_t nulloptCount =
std::count(data.begin(), data.end(), std::nullopt);
if (nulloptCount == 0) {
size_t index = 0;
for (auto val : data) {
valuesPtr[index++] = val.value();
}
return std::make_shared<FlatVector<T>>(
pool, type, nullptr, data.size(), values, std::vector<BufferPtr>{});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no null, use nullptr as null buffer. This helps test the !mayHaveNulls fast path.

}

BufferPtr nulls = allocateNulls(data.size(), pool);
auto* nullsPtr = nulls->asMutable<uint64_t>();
size_t index = 0;
Expand All @@ -223,12 +235,7 @@ VectorPtr populateBatch(
}

auto batch = std::make_shared<FlatVector<T>>(
pool,
CppToType<T>::create(),
nulls,
data.size(),
values,
std::vector<BufferPtr>{});
pool, type, nulls, data.size(), values, std::vector<BufferPtr>{});
batch->setNullCount(nullCount);
return batch;
}
Expand Down Expand Up @@ -278,8 +285,14 @@ void verifyBatch(
const uint32_t seed) {
auto size = data.size();
ASSERT_EQ(out->size(), size) << "Batch size mismatch with seed " << seed;
ASSERT_EQ(nullCount, out->getNullCount())
<< "nullCount mismatch with seed " << seed;
if (nullCount == std::nullopt) {
const auto outNullCount = out->getNullCount();
ASSERT_TRUE(outNullCount == std::nullopt || outNullCount == 0)
<< "nullCount mismatch with seed " << seed;
} else {
ASSERT_EQ(nullCount, out->getNullCount())
<< "nullCount mismatch with seed " << seed;
}

auto outFv = std::dynamic_pointer_cast<FlatVector<T>>(out);
size_t index = 0;
Expand Down Expand Up @@ -314,7 +327,8 @@ template <typename T>
void testDataTypeWriter(
const TypePtr& type,
std::vector<std::optional<T>>& data,
const uint32_t sequence = 0) {
const uint32_t sequence = 0,
DwrfFormat format = DwrfFormat::kDwrf) {
// Generate a seed and randomly shuffle the data
uint32_t seed = Random::rand32();
std::shuffle(data.begin(), data.end(), std::default_random_engine(seed));
Expand All @@ -327,9 +341,10 @@ void testDataTypeWriter(
auto dataTypeWithId = TypeWithId::create(type, 1);

// write
auto writer = BaseColumnWriter::create(context, *dataTypeWithId, sequence);
auto writer = BaseColumnWriter::create(
context, *dataTypeWithId, sequence, nullptr, format);
auto size = data.size();
auto batch = populateBatch(data, pool.get());
auto batch = populateBatch(data, pool.get(), type);
const size_t stripeCount = 2;
const size_t strideCount = 3;

Expand Down Expand Up @@ -445,6 +460,43 @@ TEST_F(ColumnWriterTest, TestNullBooleanWriter) {
testDataTypeWriter(BOOLEAN(), data);
}

TEST_F(ColumnWriterTest, testDecimalWriter) {
const auto format = DwrfFormat::kOrc;
auto genShortDecimals = [&](bool hasNull) {
std::vector<std::optional<int64_t>> shortDecimals;
for (auto i = 0; i < ITERATIONS; ++i) {
if (!hasNull || i % 15) {
shortDecimals.emplace_back(i);
} else {
shortDecimals.emplace_back(std::nullopt);
}
}
return shortDecimals;
};

auto shortValues = genShortDecimals(false);
testDataTypeWriter(DECIMAL(10, 2), shortValues, 0 /*sequence*/, format);
shortValues = genShortDecimals(true);
testDataTypeWriter(DECIMAL(10, 2), shortValues, 0 /*sequence*/, format);

auto genLongDecimals = [&](bool hasNull) {
std::vector<std::optional<int128_t>> longDecimals;
for (auto i = 0; i < ITERATIONS; ++i) {
if (!hasNull || i % 15) {
longDecimals.emplace_back(HugeInt::build(123 * i, 456 * i + 789));
} else {
longDecimals.emplace_back(std::nullopt);
}
}
return longDecimals;
};

auto longValues = genLongDecimals(false);
testDataTypeWriter(DECIMAL(38, 4), longValues, 0 /*sequence*/, format);
longValues = genLongDecimals(true);
testDataTypeWriter(DECIMAL(38, 4), longValues, 0 /*sequence*/, format);
}

TEST_F(ColumnWriterTest, TestTimestampEpochWriter) {
std::vector<std::optional<Timestamp>> data;
// This value will be corrupted. verified in verifyValue method.
Expand Down
43 changes: 43 additions & 0 deletions velox/dwio/dwrf/test/TestIntDirect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,37 @@ void testInts(std::function<T()> generator) {
}
}

// Test round-trip writing and reading of huge ints. Only vInts are supported.
template <bool isSigned>
void testHugeInts(const std::vector<int128_t>& vec) {
auto pool = memory::memoryManager()->addLeafPool();
const size_t count = vec.size();

const size_t capacity = count * folly::kMaxVarintLength64 * 2;
MemorySink sink{capacity, {.pool = pool.get()}};
DataBufferHolder holder{*pool, capacity, 0, DEFAULT_PAGE_GROW_RATIO, &sink};
auto output = std::make_unique<BufferedOutputStream>(holder);
auto encoder = createDirectEncoder<isSigned>(
std::move(output), true /*useVInts*/, sizeof(int128_t));

for (size_t i = 0; i < count; ++i) {
encoder->writeHugeInt(vec[i]);
}
encoder->flush();

const size_t expectedSize = capacity;
auto input = std::make_unique<SeekableArrayInputStream>(
sink.data(), expectedSize, expectedSize);
auto decoder = std::make_unique<dwio::common::DirectDecoder<isSigned>>(
std::move(input), true /*useVInts*/, sizeof(int128_t));

std::vector<int128_t> vals(count);
decoder->nextValues(vals.data(), count, nullptr);
for (size_t i = 0; i < count; ++i) {
ASSERT_EQ(vec[i], vals[i]);
}
}

class DirectTest : public testing::Test {
protected:
static void SetUpTestCase() {
Expand Down Expand Up @@ -229,3 +260,15 @@ TEST_F(DirectTest, corruptedInts) {
testCorruptedVarInts<false>();
testCorruptedVarInts<true>();
}

TEST_F(DirectTest, hugeInts) {
folly::Random::DefaultGenerator rng;
std::vector<int128_t> vec;
constexpr size_t count = 10240;
for (auto i = 0; i < count; ++i) {
vec.emplace_back(
HugeInt::build(folly::Random::rand64(), folly::Random::rand64()));
}
testHugeInts<true>(vec);
testHugeInts<false>(vec);
}
Loading
Loading