Skip to content

Commit

Permalink
Implement column statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Jun 14, 2023
1 parent 6522ab1 commit 2809fbf
Show file tree
Hide file tree
Showing 3 changed files with 442 additions and 106 deletions.
61 changes: 61 additions & 0 deletions cpp/src/parquet/float_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <cstring>

#include "arrow/util/bit_util.h"
#include "arrow/util/ubsan.h"
#include "parquet/types.h"

namespace parquet {

struct float16 {
constexpr static uint16_t min() { return 0b1111101111111111; }
constexpr static uint16_t max() { return 0b0111101111111111; }
constexpr static uint16_t positive_zero() { return 0b0000000000000000; }
constexpr static uint16_t negative_zero() { return 0b1000000000000000; }

static uint8_t* min_ptr() { return min_; }
static uint8_t* max_ptr() { return max_; }
static uint8_t* positive_zero_ptr() { return positive_zero_; }
static uint8_t* negative_zero_ptr() { return negative_zero_; }

static bool is_nan(uint16_t n) { return (n & 0x7c00) == 0x7c00 && (n & 0x03ff) != 0; }
static bool is_zero(uint16_t n) { return (n & 0x7fff) == 0; }
static bool signbit(uint16_t n) { return (n & 0x8000) != 0; }

static uint16_t Pack(const uint8_t* src) {
return ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint16_t>(src));
}
static uint16_t Pack(const FLBA& src) { return Pack(src.ptr); }

static uint8_t* Unpack(uint16_t src, uint8_t* dest) {
src = ::arrow::bit_util::ToLittleEndian(src);
return static_cast<uint8_t*>(std::memcpy(dest, &src, sizeof(src)));
}

private:
static inline uint8_t min_[] = {0b11111111, 0b11111011};
static inline uint8_t max_[] = {0b11111111, 0b01111011};
static inline uint8_t positive_zero_[] = {0b00000000, 0b00000000};
static inline uint8_t negative_zero_[] = {0b00000000, 0b10000000};
};

} // namespace parquet
144 changes: 126 additions & 18 deletions cpp/src/parquet/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/visit_data_inline.h"
#include "parquet/encoding.h"
#include "parquet/exception.h"
#include "parquet/float_internal.h"
#include "parquet/platform.h"
#include "parquet/schema.h"

Expand Down Expand Up @@ -277,11 +278,54 @@ template <bool is_signed>
struct CompareHelper<FLBAType, is_signed>
: public BinaryLikeCompareHelperBase<FLBAType, is_signed> {};

struct Float16CompareHelper {
using T = FLBA;

static T DefaultMin() { return T{float16::max_ptr()}; }
static T DefaultMax() { return T{float16::min_ptr()}; }

static T Coalesce(T val, T fallback) {
return val.ptr != nullptr && float16::is_nan(float16::Pack(val)) ? fallback : val;
}

static inline bool Compare(int type_length, const T& a, const T& b) {
uint16_t l = float16::Pack(a);
uint16_t r = float16::Pack(b);

if (l & 0x8000) {
if (r & 0x8000) {
// Both are negative
return (l & 0x7fff) > (r & 0x7fff);
} else {
// Handle +/-0
return (l & 0x7fff) || r != 0;
}
} else if (r & 0x8000) {
return false;
} else {
// Both are positive
return (l & 0x7fff) < (r & 0x7fff);
}
}

static T Min(int type_length, const T& a, const T& b) {
if (a.ptr == nullptr) return b;
if (b.ptr == nullptr) return a;
return Compare(type_length, a, b) ? a : b;
}

static T Max(int type_length, const T& a, const T& b) {
if (a.ptr == nullptr) return b;
if (b.ptr == nullptr) return a;
return Compare(type_length, a, b) ? b : a;
}
};

using ::std::optional;

template <typename T>
::arrow::enable_if_t<std::is_integral<T>::value, optional<std::pair<T, T>>>
CleanStatistic(std::pair<T, T> min_max) {
CleanStatistic(std::pair<T, T> min_max, LogicalType::Type::type) {
return min_max;
}

Expand All @@ -292,7 +336,7 @@ CleanStatistic(std::pair<T, T> min_max) {
// - If max is -0.0f, replace with 0.0f
template <typename T>
::arrow::enable_if_t<std::is_floating_point<T>::value, optional<std::pair<T, T>>>
CleanStatistic(std::pair<T, T> min_max) {
CleanStatistic(std::pair<T, T> min_max, LogicalType::Type::type) {
T min = min_max.first;
T max = min_max.second;

Expand All @@ -318,26 +362,55 @@ CleanStatistic(std::pair<T, T> min_max) {
return {{min, max}};
}

optional<std::pair<FLBA, FLBA>> CleanStatistic(std::pair<FLBA, FLBA> min_max) {
optional<std::pair<FLBA, FLBA>> CleanFloat16Statistic(std::pair<FLBA, FLBA> min_max) {
FLBA min = min_max.first;
FLBA max = min_max.second;
uint16_t min_packed = float16::Pack(min);
uint16_t max_packed = float16::Pack(max);

if (float16::is_nan(min_packed) || float16::is_nan(max_packed)) {
return ::std::nullopt;
}

if (min_packed == float16::max() && max_packed == float16::min()) {
return ::std::nullopt;
}

if (min_packed == float16::positive_zero()) {
min = FLBA{float16::negative_zero_ptr()};
}
if (max_packed == float16::negative_zero()) {
max = FLBA{float16::positive_zero_ptr()};
}

return {{min, max}};
}

optional<std::pair<FLBA, FLBA>> CleanStatistic(std::pair<FLBA, FLBA> min_max,
LogicalType::Type::type logical_type) {
if (min_max.first.ptr == nullptr || min_max.second.ptr == nullptr) {
return ::std::nullopt;
}
if (logical_type == LogicalType::Type::FLOAT16) {
return CleanFloat16Statistic(std::move(min_max));
}
return min_max;
}

optional<std::pair<ByteArray, ByteArray>> CleanStatistic(
std::pair<ByteArray, ByteArray> min_max) {
std::pair<ByteArray, ByteArray> min_max, LogicalType::Type::type) {
if (min_max.first.ptr == nullptr || min_max.second.ptr == nullptr) {
return ::std::nullopt;
}
return min_max;
}

template <bool is_signed, typename DType>
template <bool is_signed, typename DType,
typename HelperType = CompareHelper<DType, is_signed>>
class TypedComparatorImpl : virtual public TypedComparator<DType> {
public:
using T = typename DType::c_type;
using Helper = CompareHelper<DType, is_signed>;
using Helper = HelperType;

explicit TypedComparatorImpl(int type_length = -1) : type_length_(type_length) {}

Expand Down Expand Up @@ -412,9 +485,9 @@ TypedComparatorImpl</*is_signed=*/false, Int32Type>::GetMinMax(const int32_t* va
return {SafeCopy<int32_t>(min), SafeCopy<int32_t>(max)};
}

template <bool is_signed, typename DType>
template <bool is_signed, typename DType, typename Helper>
std::pair<typename DType::c_type, typename DType::c_type>
TypedComparatorImpl<is_signed, DType>::GetMinMax(const ::arrow::Array& values) {
TypedComparatorImpl<is_signed, DType, Helper>::GetMinMax(const ::arrow::Array& values) {
ParquetException::NYI(values.type()->ToString());
}

Expand Down Expand Up @@ -458,6 +531,16 @@ std::pair<ByteArray, ByteArray> TypedComparatorImpl<false, ByteArrayType>::GetMi
return GetMinMaxBinaryHelper<false>(*this, values);
}

static LogicalType::Type::type LogicalTypeId(const ColumnDescriptor* descr) {
if (const auto& logical_type = descr->logical_type()) {
return logical_type->type();
}
return LogicalType::Type::NONE;
}
static LogicalType::Type::type LogicalTypeId(const Statistics& stats) {
return LogicalTypeId(stats.descr());
}

template <typename DType>
class TypedStatisticsImpl : public TypedStatistics<DType> {
public:
Expand All @@ -468,8 +551,7 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
pool_(pool),
min_buffer_(AllocateBuffer(pool_, 0)),
max_buffer_(AllocateBuffer(pool_, 0)) {
auto comp = Comparator::Make(descr);
comparator_ = std::static_pointer_cast<TypedComparator<DType>>(comp);
comparator_ = MakeComparator<DType>(descr);
TypedStatisticsImpl::Reset();
has_null_count_ = true;
has_distinct_count_ = true;
Expand Down Expand Up @@ -525,6 +607,19 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
bool Equals(const Statistics& raw_other) const override {
if (physical_type() != raw_other.physical_type()) return false;

const auto logical_id = LogicalTypeId(*this);
switch (logical_id) {
// Only compare against logical types that influence the interpretation of the
// physical type
case LogicalType::Type::FLOAT16:
if (LogicalTypeId(raw_other) != logical_id) {
return false;
}
break;
default:
break;
}

const auto& other = checked_cast<const TypedStatisticsImpl&>(raw_other);

if (has_min_max_ != other.has_min_max_) return false;
Expand Down Expand Up @@ -650,7 +745,7 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {

void SetMinMaxPair(std::pair<T, T> min_max) {
// CleanStatistic can return a nullopt in case of erroneous values, e.g. NaN
auto maybe_min_max = CleanStatistic(min_max);
auto maybe_min_max = CleanStatistic(min_max, LogicalTypeId(*this));
if (!maybe_min_max) return;

auto min = maybe_min_max.value().first;
Expand Down Expand Up @@ -759,12 +854,8 @@ void TypedStatisticsImpl<ByteArrayType>::PlainDecode(const std::string& src,
dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
}

} // namespace

// ----------------------------------------------------------------------
// Public factory functions

std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type,
std::shared_ptr<Comparator> DoMakeComparator(Type::type physical_type,
LogicalType::Type::type logical_type,
SortOrder::type sort_order,
int type_length) {
if (SortOrder::SIGNED == sort_order) {
Expand All @@ -784,6 +875,10 @@ std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type,
case Type::BYTE_ARRAY:
return std::make_shared<TypedComparatorImpl<true, ByteArrayType>>();
case Type::FIXED_LEN_BYTE_ARRAY:
if (logical_type == LogicalType::Type::FLOAT16) {
return std::make_shared<
TypedComparatorImpl<true, FLBAType, Float16CompareHelper>>();
}
return std::make_shared<TypedComparatorImpl<true, FLBAType>>(type_length);
default:
ParquetException::NYI("Signed Compare not implemented");
Expand All @@ -809,8 +904,21 @@ std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type,
return nullptr;
}

} // namespace

// ----------------------------------------------------------------------
// Public factory functions

std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type,
SortOrder::type sort_order,
int type_length) {
return DoMakeComparator(physical_type, LogicalType::Type::NONE, sort_order,
type_length);
}

std::shared_ptr<Comparator> Comparator::Make(const ColumnDescriptor* descr) {
return Make(descr->physical_type(), descr->sort_order(), descr->type_length());
return DoMakeComparator(descr->physical_type(), LogicalTypeId(descr),
descr->sort_order(), descr->type_length());
}

std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr,
Expand Down
Loading

0 comments on commit 2809fbf

Please sign in to comment.