-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GH-43944: [C++][Parquet] Add support for arrow::ArrayStatistics: non …
…zero-copy int based types (#43945) ### Rationale for this change Statistics is useful for fast processing. Target types: * `UInt8` * `Int8` * `UInt16` * `Int16` * `UInt32` * `UInt64` * `Date32` * `Time32` * `Time64` * `Duration` ### What changes are included in this PR? Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #43944 Authored-by: Sutou Kouhei <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
- Loading branch information
1 parent
7934ea4
commit c138b47
Showing
4 changed files
with
187 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,26 +319,59 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) { | |
} | ||
|
||
template <typename ArrowType, typename ParquetType> | ||
Status TransferInt(RecordReader* reader, MemoryPool* pool, | ||
const std::shared_ptr<Field>& field, Datum* out) { | ||
Status TransferInt(RecordReader* reader, | ||
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, | ||
const ReaderContext* ctx, const std::shared_ptr<Field>& field, | ||
Datum* out) { | ||
using ArrowCType = typename ArrowType::c_type; | ||
using ParquetCType = typename ParquetType::c_type; | ||
int64_t length = reader->values_written(); | ||
ARROW_ASSIGN_OR_RAISE(auto data, | ||
::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool)); | ||
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool)); | ||
|
||
auto values = reinterpret_cast<const ParquetCType*>(reader->values()); | ||
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data()); | ||
std::copy(values, values + length, out_ptr); | ||
int64_t null_count = 0; | ||
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)}; | ||
if (field->nullable()) { | ||
*out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data), | ||
reader->ReleaseIsValid(), | ||
reader->null_count()); | ||
} else { | ||
*out = | ||
std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data), | ||
/*null_bitmap=*/nullptr, /*null_count=*/0); | ||
null_count = reader->null_count(); | ||
buffers[0] = reader->ReleaseIsValid(); | ||
} | ||
auto array_data = | ||
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count); | ||
auto array_statistics = std::make_shared<::arrow::ArrayStatistics>(); | ||
array_statistics->null_count = null_count; | ||
auto statistics = metadata->statistics().get(); | ||
if (statistics) { | ||
if (statistics->HasDistinctCount()) { | ||
array_statistics->distinct_count = statistics->distinct_count(); | ||
} | ||
if (statistics->HasMinMax()) { | ||
auto typed_statistics = | ||
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics); | ||
const ArrowCType min = typed_statistics->min(); | ||
const ArrowCType max = typed_statistics->max(); | ||
if (std::is_signed<ArrowCType>::value) { | ||
array_statistics->min = static_cast<int64_t>(min); | ||
array_statistics->max = static_cast<int64_t>(max); | ||
} else { | ||
array_statistics->min = static_cast<uint64_t>(min); | ||
array_statistics->max = static_cast<uint64_t>(max); | ||
} | ||
// We can assume that integer based min/max are always exact if | ||
// they exist. Apache Parquet's "Statistics" has | ||
// "is_min_value_exact" and "is_max_value_exact" but we can | ||
// ignore them for integer based min/max. | ||
// | ||
// See also the discussion at [email protected]: | ||
// https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4 | ||
array_statistics->is_min_exact = true; | ||
array_statistics->is_max_exact = true; | ||
} | ||
} | ||
array_data->statistics = std::move(array_statistics); | ||
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data)); | ||
return Status::OK(); | ||
} | ||
|
||
|
@@ -728,21 +761,26 @@ Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, | |
|
||
} // namespace | ||
|
||
#define TRANSFER_INT32(ENUM, ArrowType) \ | ||
case ::arrow::Type::ENUM: { \ | ||
Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, &result); \ | ||
RETURN_NOT_OK(s); \ | ||
#define TRANSFER_INT32(ENUM, ArrowType) \ | ||
case ::arrow::Type::ENUM: { \ | ||
Status s = TransferInt<ArrowType, Int32Type>(reader, std::move(metadata), ctx, \ | ||
value_field, &result); \ | ||
RETURN_NOT_OK(s); \ | ||
} break; | ||
|
||
#define TRANSFER_INT64(ENUM, ArrowType) \ | ||
case ::arrow::Type::ENUM: { \ | ||
Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, &result); \ | ||
RETURN_NOT_OK(s); \ | ||
#define TRANSFER_INT64(ENUM, ArrowType) \ | ||
case ::arrow::Type::ENUM: { \ | ||
Status s = TransferInt<ArrowType, Int64Type>(reader, std::move(metadata), ctx, \ | ||
value_field, &result); \ | ||
RETURN_NOT_OK(s); \ | ||
} break; | ||
|
||
Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field, | ||
const ColumnDescriptor* descr, MemoryPool* pool, | ||
Status TransferColumnData(RecordReader* reader, | ||
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, | ||
const std::shared_ptr<Field>& value_field, | ||
const ColumnDescriptor* descr, const ReaderContext* ctx, | ||
std::shared_ptr<ChunkedArray>* out) { | ||
auto pool = ctx->pool; | ||
Datum result; | ||
std::shared_ptr<ChunkedArray> chunked_result; | ||
switch (value_field->type()->id()) { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters