diff --git a/cpp/src/arrow/array/builder_dict.h b/cpp/src/arrow/array/builder_dict.h index cb0aaf309915b..f46eaefc74b6f 100644 --- a/cpp/src/arrow/array/builder_dict.h +++ b/cpp/src/arrow/array/builder_dict.h @@ -724,6 +724,8 @@ using BinaryDictionaryBuilder = DictionaryBuilder; using StringDictionaryBuilder = DictionaryBuilder; using BinaryDictionary32Builder = Dictionary32Builder; using StringDictionary32Builder = Dictionary32Builder; +using LargeBinaryDictionary32Builder = Dictionary32Builder; +using LargeStringDictionary32Builder = Dictionary32Builder; /// @} diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 48228d43ef932..64d2893a9725c 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -678,6 +678,8 @@ class ARROW_EXPORT BaseBinaryType : public DataType { constexpr int64_t kBinaryMemoryLimit = std::numeric_limits::max() - 1; +constexpr int64_t kLargeBinaryMemoryLimit = std::numeric_limits::max() - 1; + /// \addtogroup binary-datatypes /// /// @{ diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index ad33ca296a283..2f3e8953daaf0 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -438,11 +438,11 @@ void CheckConfiguredRoundtrip( void DoSimpleRoundtrip(const std::shared_ptr& table, bool use_threads, int64_t row_group_size, const std::vector& column_subset, std::shared_ptr
* out, - const std::shared_ptr& arrow_properties = - default_arrow_writer_properties()) { + const std::shared_ptr& + arrow_writer_properties = default_arrow_writer_properties()) { std::shared_ptr buffer; ASSERT_NO_FATAL_FAILURE( - WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer)); + WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer)); std::unique_ptr reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), @@ -610,9 +610,18 @@ class ParquetIOTestBase : public ::testing::Test { } void ReaderFromSink(std::unique_ptr* out) { + return ReaderFromSink(out, default_arrow_reader_properties()); + } + + void ReaderFromSink(std::unique_ptr* out, + const ArrowReaderProperties& arrow_reader_properties) { ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); - ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), out)); + + FileReaderBuilder builder; + ASSERT_OK_NO_THROW(builder.Open(std::make_shared(buffer))); + ASSERT_OK_NO_THROW(builder.properties(arrow_reader_properties) + ->memory_pool(::arrow::default_memory_pool()) + ->Build(out)); } void ReadSingleColumnFile(std::unique_ptr file_reader, @@ -660,18 +669,20 @@ class ParquetIOTestBase : public ::testing::Test { void RoundTripSingleColumn( const std::shared_ptr& values, const std::shared_ptr& expected, - const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_properties, + const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_writer_properties, + const ArrowReaderProperties& arrow_reader_properties = + default_arrow_reader_properties(), bool nullable = true) { std::shared_ptr
table = MakeSimpleTable(values, nullable); this->ResetSink(); ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, values->length(), default_writer_properties(), - arrow_properties)); + arrow_writer_properties)); std::shared_ptr
out; std::unique_ptr reader; - ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); - const bool expect_metadata = arrow_properties->store_schema(); + ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader, arrow_reader_properties)); + const bool expect_metadata = arrow_writer_properties->store_schema(); ASSERT_NO_FATAL_FAILURE( this->ReadTableFromFile(std::move(reader), expect_metadata, &out)); ASSERT_EQ(1, out->num_columns()); @@ -1342,6 +1353,23 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compatibility) { using TestStringParquetIO = TestParquetIO<::arrow::StringType>; +#if defined(_WIN64) || defined(__LP64__) +TEST_F(TestStringParquetIO, SmallStringWithLargeBinaryVariantSetting) { + auto values = ArrayFromJSON(::arrow::utf8(), R"(["foo", "", null, "bar"])"); + + this->RoundTripSingleColumn(values, values, default_arrow_writer_properties()); + + ArrowReaderProperties arrow_reader_properties; + arrow_reader_properties.set_use_large_binary_variants(true); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr casted, + ::arrow::compute::Cast(*values, ::arrow::large_utf8())); + + this->RoundTripSingleColumn(values, casted, default_arrow_writer_properties(), + arrow_reader_properties); +} +#endif + TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { std::shared_ptr values; ::arrow::StringBuilder builder; @@ -1369,6 +1397,7 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>; +#if defined(_WIN64) || defined(__LP64__) TEST_F(TestLargeBinaryParquetIO, Basics) { const char* json = "[\"foo\", \"\", null, \"\xff\"]"; @@ -1388,6 +1417,13 @@ TEST_F(TestLargeBinaryParquetIO, Basics) { const auto arrow_properties = ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); this->RoundTripSingleColumn(large_array, large_array, arrow_properties); + + ArrowReaderProperties arrow_reader_properties; + arrow_reader_properties.set_use_large_binary_variants(true); + // Input is narrow array, but expected output is large array, opposite of the above + // tests. This validates narrow arrays can be read as large arrays. + this->RoundTripSingleColumn(narrow_array, large_array, + default_arrow_writer_properties(), arrow_reader_properties); } using TestLargeStringParquetIO = TestParquetIO<::arrow::LargeStringType>; @@ -1412,6 +1448,7 @@ TEST_F(TestLargeStringParquetIO, Basics) { ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); this->RoundTripSingleColumn(large_array, large_array, arrow_properties); } +#endif using TestNullParquetIO = TestParquetIO<::arrow::NullType>; @@ -3834,13 +3871,14 @@ TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) { ASSERT_EQ(expected, calculated); } -void TryReadDataFile(const std::string& path, - ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { +void TryReadDataFileWithProperties( + const std::string& path, const ArrowReaderProperties& properties, + ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { auto pool = ::arrow::default_memory_pool(); std::unique_ptr arrow_reader; - Status s = - FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), &arrow_reader); + Status s = FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), properties, + &arrow_reader); if (s.ok()) { std::shared_ptr<::arrow::Table> table; s = arrow_reader->ReadTable(&table); @@ -3851,6 +3889,11 @@ void TryReadDataFile(const std::string& path, << ", but got " << s.ToString(); } +void TryReadDataFile(const std::string& path, + ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { + TryReadDataFileWithProperties(path, default_arrow_reader_properties(), expected_code); +} + TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { // PARQUET-995 TryReadDataFile(test::get_data_file("alltypes_plain.parquet")); @@ -3862,6 +3905,19 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) { TryReadDataFile(path, ::arrow::StatusCode::IOError); } +#if defined(ARROW_WITH_BROTLI) && defined(__LP64__) +TEST(TestArrowParquet, LargeByteArray) { + auto path = test::get_data_file("large_string_map.brotli.parquet"); + TryReadDataFile(path, ::arrow::StatusCode::NotImplemented); + ArrowReaderProperties reader_properties; + reader_properties.set_use_large_binary_variants(true); + reader_properties.set_read_dictionary(0, false); + TryReadDataFileWithProperties(path, reader_properties); + reader_properties.set_read_dictionary(0, true); + TryReadDataFileWithProperties(path, reader_properties); +} +#endif + TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) { // ARROW-3762 ::arrow::StringBuilder builder; @@ -4548,16 +4604,22 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) { class TestArrowReadDeltaEncoding : public ::testing::Test { public: void ReadTableFromParquetFile(const std::string& file_name, + const ArrowReaderProperties& properties, std::shared_ptr
* out) { auto file = test::get_data_file(file_name); auto pool = ::arrow::default_memory_pool(); std::unique_ptr parquet_reader; - ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties, &parquet_reader)); ASSERT_OK(parquet_reader->ReadTable(out)); ASSERT_OK((*out)->ValidateFull()); } + void ReadTableFromParquetFile(const std::string& file_name, + std::shared_ptr
* out) { + return ReadTableFromParquetFile(file_name, default_arrow_reader_properties(), out); + } + void ReadTableFromCSVFile(const std::string& file_name, const ::arrow::csv::ConvertOptions& convert_options, std::shared_ptr
* out) { @@ -4605,6 +4667,27 @@ TEST_F(TestArrowReadDeltaEncoding, DeltaByteArray) { ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); } +TEST_F(TestArrowReadDeltaEncoding, DeltaByteArrayWithLargeBinaryVariant) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ArrowReaderProperties properties; + properties.set_use_large_binary_variants(true); + + ReadTableFromParquetFile("delta_byte_array.parquet", properties, &actual_table); + + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + std::vector column_names = { + "c_customer_id", "c_salutation", "c_first_name", + "c_last_name", "c_preferred_cust_flag", "c_birth_country", + "c_login", "c_email_address", "c_last_review_date"}; + for (auto name : column_names) { + convert_options.column_types[name] = ::arrow::large_utf8(); + } + convert_options.strings_can_be_null = true; + ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); +} + TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { auto file = test::get_data_file("delta_byte_array.parquet"); auto pool = ::arrow::default_memory_pool(); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 40fbdcbb562b1..c74a93f419e5c 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -219,6 +219,7 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; + ctx->use_large_binary_variants = reader_properties_.use_large_binary_variants(); return GetReader(manifest_.schema_fields[i], ctx, out); } @@ -462,7 +463,8 @@ class LeafReader : public ColumnReaderImpl { input_(std::move(input)), descr_(input_->descr()) { record_reader_ = RecordReader::Make( - descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); + descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY, + /*read_dense_for_nullable*/ false, ctx_->use_large_binary_variants); NextRowGroup(); } @@ -1218,6 +1220,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto ctx->pool = pool_; ctx->iterator_factory = iterator_factory; ctx->filter_leaves = false; + ctx->use_large_binary_variants = reader_properties_.use_large_binary_variants(); std::unique_ptr result; RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result)); *out = std::move(result); diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index a294b712a7ce3..a1c40df747706 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -487,8 +487,9 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool, auto chunks = binary_reader->GetBuilderChunks(); for (auto& chunk : chunks) { if (!chunk->type()->Equals(*logical_type_field->type())) { - // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets - // will be lost because they are first created as int32 and then cast to int64. + // If a LargeBinary chunk is larger than 2GB and use_large_binary_variants + // is not set, the MSBs of offsets will be lost because they are first created + // as int32 and then cast to int64. ARROW_ASSIGN_OR_RAISE( chunk, ::arrow::compute::Cast(*chunk, logical_type_field->type(), cast_options, &ctx)); diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b5..6a904f3d45b6e 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -109,6 +109,7 @@ struct ReaderContext { FileColumnIteratorFactory iterator_factory; bool filter_leaves; std::shared_ptr> included_leaves; + bool use_large_binary_variants = false; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index c5d5e0743a7f1..b58ebedb62737 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -462,7 +462,9 @@ struct SchemaTreeContext { bool IsDictionaryReadSupported(const ArrowType& type) { // Only supported currently for BYTE_ARRAY types - return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING; + return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING || + type.id() == ::arrow::Type::LARGE_BINARY || + type.id() == ::arrow::Type::LARGE_STRING; } // ---------------------------------------------------------------------- @@ -473,7 +475,8 @@ ::arrow::Result> GetTypeForNode( SchemaTreeContext* ctx) { ASSIGN_OR_RAISE( std::shared_ptr storage_type, - GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); + GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit(), + ctx->properties.use_large_binary_variants())); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 064bf4f55cc7e..b399b1f83dbdd 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -110,17 +110,18 @@ Result> MakeArrowTimestamp(const LogicalType& logical } } -Result> FromByteArray(const LogicalType& logical_type) { +Result> FromByteArray(const LogicalType& logical_type, + bool use_large_binary_variants) { switch (logical_type.type()) { case LogicalType::Type::STRING: - return ::arrow::utf8(); + return use_large_binary_variants ? ::arrow::large_utf8() : ::arrow::utf8(); case LogicalType::Type::DECIMAL: return MakeArrowDecimal(logical_type); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: case LogicalType::Type::JSON: case LogicalType::Type::BSON: - return ::arrow::binary(); + return use_large_binary_variants ? ::arrow::large_binary() : ::arrow::binary(); default: return Status::NotImplemented("Unhandled logical logical_type ", logical_type.ToString(), " for binary array"); @@ -181,7 +182,7 @@ Result> FromInt64(const LogicalType& logical_type) { Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ::arrow::TimeUnit::type int96_arrow_time_unit, bool use_large_binary_variants) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -200,7 +201,7 @@ Result> GetArrowType( case ParquetType::DOUBLE: return ::arrow::float64(); case ParquetType::BYTE_ARRAY: - return FromByteArray(logical_type); + return FromByteArray(logical_type, use_large_binary_variants); case ParquetType::FIXED_LEN_BYTE_ARRAY: return FromFLBA(logical_type, type_length); default: { @@ -213,9 +214,10 @@ Result> GetArrowType( Result> GetArrowType( const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ::arrow::TimeUnit::type int96_arrow_time_unit, bool use_large_binary_variants) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length(), int96_arrow_time_unit); + primitive.type_length(), int96_arrow_time_unit, + use_large_binary_variants); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index fb837c3ee6cab..67aecf6e73f1a 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -29,7 +29,9 @@ namespace arrow { using ::arrow::Result; -Result> FromByteArray(const LogicalType& logical_type); +Result> FromByteArray(const LogicalType& logical_type, + bool use_large_binary_variants); + Result> FromFLBA(const LogicalType& logical_type, int32_t physical_length); Result> FromInt32(const LogicalType& logical_type); @@ -37,15 +39,18 @@ Result> FromInt64(const LogicalType& logical_ Result> GetArrowType(Type::type physical_type, const LogicalType& logical_type, - int type_length); + int type_length, + bool use_large_binary_variants); Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO, + bool use_large_binary_variants = false); Result> GetArrowType( const schema::PrimitiveNode& primitive, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO, + bool use_large_binary_variants = false); } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 3294aaaf283f1..cf2511db04530 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -2093,15 +2093,44 @@ class FLBARecordReader : public TypedRecordReader, std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; }; -class ByteArrayChunkedRecordReader : public TypedRecordReader, - virtual public BinaryRecordReader { +// TODO: Below concept could be used to simplify type assertion in C++20. +// template +// concept ByteArrayTypeConcept = std::is_same::value || +// std::is_same::value; + +template +struct IsByteArrayType : std::false_type {}; + +template <> +struct IsByteArrayType : std::true_type {}; + +template <> +struct IsByteArrayType : std::true_type {}; + +template +struct ByteArrayBuilderTypeTrait { + using BuilderType = + typename std::conditional::value, + ::arrow::LargeBinaryBuilder, + ::arrow::BinaryBuilder>::type; +}; + +template +class ByteArrayChunkedRecordReaderImpl : public TypedRecordReader, + virtual public BinaryRecordReader { public: - ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, - ::arrow::MemoryPool* pool, bool read_dense_for_nullable) - : TypedRecordReader(descr, leaf_info, pool, - read_dense_for_nullable) { + using BASE = TypedRecordReader; + using BASE::descr_; + using BASE::ResetValues; + using BuilderType = typename ByteArrayBuilderTypeTrait::BuilderType; + + ByteArrayChunkedRecordReaderImpl(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool, + bool read_dense_for_nullable) + : TypedRecordReader(descr, leaf_info, pool, read_dense_for_nullable) { + static_assert(IsByteArrayType::value, "Invalid ByteArrayType"); ARROW_DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - accumulator_.builder = std::make_unique<::arrow::BinaryBuilder>(pool); + accumulator_.builder = std::make_unique(pool); } ::arrow::ArrayVector GetBuilderChunks() override { @@ -2132,15 +2161,25 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, private: // Helper data structure for accumulating builder chunks - typename EncodingTraits::Accumulator accumulator_; + typename EncodingTraits::Accumulator accumulator_; }; -class ByteArrayDictionaryRecordReader : public TypedRecordReader, - virtual public DictionaryRecordReader { +using ByteArrayChunkedRecordReader = ByteArrayChunkedRecordReaderImpl; +using LargeByteArrayChunkedRecordReader = + ByteArrayChunkedRecordReaderImpl; + +template +class ByteArrayDictionaryRecordReaderImpl : public TypedRecordReader, + virtual public DictionaryRecordReader { + using BASE = TypedRecordReader; + using BASE::current_encoding_; + using BASE::ResetValues; + public: - ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, - ::arrow::MemoryPool* pool, bool read_dense_for_nullable) - : TypedRecordReader(descr, leaf_info, pool, read_dense_for_nullable), + ByteArrayDictionaryRecordReaderImpl(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool, + bool read_dense_for_nullable) + : TypedRecordReader(descr, leaf_info, pool, read_dense_for_nullable), builder_(pool) { this->read_dictionary_ = true; } @@ -2211,12 +2250,17 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader, } private: - using BinaryDictDecoder = DictDecoder; + using BinaryDictDecoder = DictDecoder; - ::arrow::BinaryDictionary32Builder builder_; + typename EncodingTraits::DictAccumulator builder_; std::vector> result_chunks_; }; +using ByteArrayDictionaryRecordReader = + ByteArrayDictionaryRecordReaderImpl; +using LargeByteArrayDictionaryRecordReader = + ByteArrayDictionaryRecordReaderImpl; + // TODO(wesm): Implement these to some satisfaction template <> void TypedRecordReader::DebugPrintState() {} @@ -2224,6 +2268,9 @@ void TypedRecordReader::DebugPrintState() {} template <> void TypedRecordReader::DebugPrintState() {} +template <> +void TypedRecordReader::DebugPrintState() {} + template <> void TypedRecordReader::DebugPrintState() {} @@ -2241,12 +2288,25 @@ std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* } } +std::shared_ptr MakeLargeByteArrayRecordReader( + const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool, + bool read_dictionary, bool read_dense_for_nullable) { + if (read_dictionary) { + return std::make_shared( + descr, leaf_info, pool, read_dense_for_nullable); + } else { + return std::make_shared(descr, leaf_info, pool, + read_dense_for_nullable); + } +} + } // namespace std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool, bool read_dictionary, - bool read_dense_for_nullable) { + bool read_dense_for_nullable, + bool use_large_binary_variants) { switch (descr->physical_type()) { case Type::BOOLEAN: return std::make_shared>(descr, leaf_info, pool, @@ -2267,8 +2327,11 @@ std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, return std::make_shared>(descr, leaf_info, pool, read_dense_for_nullable); case Type::BYTE_ARRAY: { - return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary, - read_dense_for_nullable); + return use_large_binary_variants + ? MakeLargeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary, + read_dense_for_nullable) + : MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary, + read_dense_for_nullable); } case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared(descr, leaf_info, pool, diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 334b8bcffe0b8..7e938310a9839 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -321,7 +321,8 @@ class PARQUET_EXPORT RecordReader { static std::shared_ptr Make( const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), - bool read_dictionary = false, bool read_dense_for_nullable = false); + bool read_dictionary = false, bool read_dense_for_nullable = false, + bool use_large_binary_variants = false); virtual ~RecordReader() = default; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 134a22f28412b..bb931ecb5e929 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1238,19 +1238,33 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } -struct ArrowBinaryHelper { - explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* out) { +template +struct ArrowBinaryHelperTraits; + +template <> +struct ArrowBinaryHelperTraits { + static constexpr auto memory_limit = ::arrow::kBinaryMemoryLimit; +}; + +template <> +struct ArrowBinaryHelperTraits { + static constexpr auto memory_limit = ::arrow::kLargeBinaryMemoryLimit; +}; + +template +struct ArrowBinaryHelperBase { + explicit ArrowBinaryHelperBase(typename EncodingTraits::Accumulator* out) { this->out = out; this->builder = out->builder.get(); this->chunk_space_remaining = - ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); + ArrowBinaryHelperTraits::memory_limit - this->builder->value_data_length(); } Status PushChunk() { std::shared_ptr<::arrow::Array> result; RETURN_NOT_OK(builder->Finish(&result)); out->chunks.push_back(result); - chunk_space_remaining = ::arrow::kBinaryMemoryLimit; + chunk_space_remaining = ArrowBinaryHelperTraits::memory_limit; return Status::OK(); } @@ -1270,11 +1284,13 @@ struct ArrowBinaryHelper { Status AppendNull() { return builder->AppendNull(); } - typename EncodingTraits::Accumulator* out; - ::arrow::BinaryBuilder* builder; + typename EncodingTraits::Accumulator* out; + typename EncodingTraits::BinaryBuilder* builder; int64_t chunk_space_remaining; }; +using ArrowBinaryHelper = ArrowBinaryHelperBase; + template <> inline int PlainDecoder::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1289,6 +1305,20 @@ inline int PlainDecoder::DecodeArrow( ParquetException::NYI(); } +template <> +inline int PlainDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) { + ParquetException::NYI(); +} + +template <> +inline int PlainDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) { + ParquetException::NYI(); +} + template <> inline int PlainDecoder::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1337,11 +1367,15 @@ inline int PlainDecoder::DecodeArrow( return values_decoded; } -class PlainByteArrayDecoder : public PlainDecoder, - virtual public ByteArrayDecoder { +template +class PlainByteArrayDecoderBase : public PlainDecoder, + virtual public TypedDecoder { public: - using Base = PlainDecoder; + using Base = PlainDecoder; + using Base::data_; using Base::DecodeSpaced; + using Base::len_; + using Base::num_values_; using Base::PlainDecoder; // ---------------------------------------------------------------------- @@ -1349,7 +1383,7 @@ class PlainByteArrayDecoder : public PlainDecoder, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - ::arrow::BinaryDictionary32Builder* builder) override { + typename EncodingTraits::DictAccumulator* builder) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, builder, &result)); @@ -1361,7 +1395,7 @@ class PlainByteArrayDecoder : public PlainDecoder, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out) override { + typename EncodingTraits::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); @@ -1371,9 +1405,9 @@ class PlainByteArrayDecoder : public PlainDecoder, private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_values_decoded) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelperBase helper(out); int values_decoded = 0; RETURN_NOT_OK(helper.builder->Reserve(num_values)); @@ -1455,6 +1489,9 @@ class PlainByteArrayDecoder : public PlainDecoder, } }; +using PlainByteArrayDecoder = PlainByteArrayDecoderBase; +using PlainLargeByteArrayDecoder = PlainByteArrayDecoderBase; + class PlainFLBADecoder : public PlainDecoder, virtual public FLBADecoder { public: using Base = PlainDecoder; @@ -1484,6 +1521,41 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { // Perform type-specific initiatialization void SetDict(TypedDecoder* dictionary) override; + template || + std::is_same_v>> + void SetByteArrayDict(TypedDecoder* dictionary) { + DecodeDict(dictionary); + + auto dict_values = reinterpret_cast(dictionary_->mutable_data()); + + using offset_type = typename EncodingTraits::ArrowType::offset_type; + + offset_type total_size = 0; + for (int i = 0; i < dictionary_length_; ++i) { + if (AddWithOverflow(total_size, dict_values[i].len, &total_size)) { + throw ParquetException("String/Binary length too large"); + } + } + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, + /*shrink_to_fit=*/false)); + PARQUET_THROW_NOT_OK( + byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(offset_type), + /*shrink_to_fit=*/false)); + + offset_type offset = 0; + uint8_t* bytes_data = byte_array_data_->mutable_data(); + auto* bytes_offsets = + reinterpret_cast(byte_array_offsets_->mutable_data()); + for (int i = 0; i < dictionary_length_; ++i) { + memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len); + bytes_offsets[i] = offset; + dict_values[i].ptr = bytes_data + offset; + offset += dict_values[i].len; + } + bytes_offsets[dictionary_length_] = offset; + } + void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; if (len == 0) { @@ -1559,11 +1631,22 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { valid_bits, valid_bits_offset, num_values, null_count, [&]() { valid_bytes[i++] = 1; }, [&]() { ++i; }); - auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder); - PARQUET_THROW_NOT_OK( - binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data())); - num_values_ -= num_values - null_count; - return num_values - null_count; + // It looks like this method is only called by ByteArray types. Previously, + // there was an unconditional cast to + // ::arrow::Dictionary32Builder<::arrow::BinaryType>. This won't work for + // LargeByteArrayType and the Type template argument can't be used unconditionally + // because it is not defined for several other types. + if constexpr (std::is_same_v || + std::is_same_v) { + auto binary_builder = + checked_cast::DictAccumulator*>(builder); + PARQUET_THROW_NOT_OK( + binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data())); + num_values_ -= num_values - null_count; + return num_values - null_count; + } + + ParquetException::NYI("DecodeIndicesSpaced not implemented for this type"); } int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override { @@ -1580,10 +1663,22 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) { ParquetException::EofException(); } - auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder); - PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values)); - num_values_ -= num_values; - return num_values; + + // It looks like this method is only called by ByteArray types. Previously, + // there was an unconditional cast to + // ::arrow::Dictionary32Builder<::arrow::BinaryType>. This won't work for + // LargeByteArrayType and the Type template argument can't be used unconditionally + // because it is not defined for several other types. + if constexpr (std::is_same_v || + std::is_same_v) { + auto binary_builder = + checked_cast::DictAccumulator*>(builder); + PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values)); + num_values_ -= num_values; + return num_values; + } + + ParquetException::NYI("DecodeIndices not implemented for this type"); } int DecodeIndices(int num_values, int32_t* indices) override { @@ -1650,31 +1745,13 @@ void DictDecoderImpl::SetDict(TypedDecoder* dictionary template <> void DictDecoderImpl::SetDict(TypedDecoder* dictionary) { - DecodeDict(dictionary); - - auto dict_values = reinterpret_cast(dictionary_->mutable_data()); - - int total_size = 0; - for (int i = 0; i < dictionary_length_; ++i) { - total_size += dict_values[i].len; - } - PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, - /*shrink_to_fit=*/false)); - PARQUET_THROW_NOT_OK( - byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t), - /*shrink_to_fit=*/false)); + SetByteArrayDict(dictionary); +} - int32_t offset = 0; - uint8_t* bytes_data = byte_array_data_->mutable_data(); - int32_t* bytes_offsets = - reinterpret_cast(byte_array_offsets_->mutable_data()); - for (int i = 0; i < dictionary_length_; ++i) { - memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len); - bytes_offsets[i] = offset; - dict_values[i].ptr = bytes_data + offset; - offset += dict_values[i].len; - } - bytes_offsets[dictionary_length_] = offset; +template <> +void DictDecoderImpl::SetDict( + TypedDecoder* dictionary) { + SetByteArrayDict(dictionary); } template <> @@ -1723,6 +1800,20 @@ inline int DictDecoderImpl::DecodeArrow( ParquetException::NYI("DecodeArrow implemented elsewhere"); } +template <> +inline int DictDecoderImpl::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) { + ParquetException::NYI("DecodeArrow implemented elsewhere"); +} + +template <> +inline int DictDecoderImpl::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) { + ParquetException::NYI("DecodeArrow implemented elsewhere"); +} + template int DictDecoderImpl::DecodeArrow( int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1854,15 +1945,30 @@ void DictDecoderImpl::InsertDictionary(::arrow::ArrayBuilder* bui PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr)); } -class DictByteArrayDecoderImpl : public DictDecoderImpl, - virtual public ByteArrayDecoder { +template <> +void DictDecoderImpl::InsertDictionary( + ::arrow::ArrayBuilder* builder) { + auto binary_builder = checked_cast<::arrow::LargeBinaryDictionary32Builder*>(builder); + + // Make a LargeBinaryArray referencing the internal dictionary data + auto arr = std::make_shared<::arrow::LargeBinaryArray>( + dictionary_length_, byte_array_offsets_, byte_array_data_); + PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr)); +} + +template +class DictByteArrayDecoderImpl : public DictDecoderImpl, + virtual public TypedDecoder { public: - using BASE = DictDecoderImpl; + using BASE = DictDecoderImpl; using BASE::DictDecoderImpl; + using BASE::dictionary_; + using BASE::idx_decoder_; + using BASE::IndexInBounds; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - ::arrow::BinaryDictionary32Builder* builder) override { + typename EncodingTraits::DictAccumulator* builder) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result)); @@ -1875,7 +1981,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out) override { + typename EncodingTraits::Accumulator* out) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result)); @@ -1889,12 +1995,12 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_num_values) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - ArrowBinaryHelper helper(out); + ArrowBinaryHelperBase helper(out); auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; @@ -1957,13 +2063,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } Status DecodeArrowDenseNonNull(int num_values, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_num_values) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; - ArrowBinaryHelper helper(out); + ArrowBinaryHelperBase helper(out); auto dict_values = reinterpret_cast(dictionary_->data()); while (values_decoded < num_values) { @@ -2715,11 +2821,12 @@ std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { // ---------------------------------------------------------------------- // DeltaLengthByteArrayDecoder -class DeltaLengthByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder { +template +class DeltaLengthByteArrayDecoderBase : public DecoderImpl, + virtual public TypedDecoder { public: - explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr, - MemoryPool* pool = ::arrow::default_memory_pool()) + explicit DeltaLengthByteArrayDecoderBase( + const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr, pool), buffered_length_(AllocateBuffer(pool, 0)) {} @@ -2769,7 +2876,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out) override { + typename EncodingTraits::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); @@ -2778,7 +2885,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::DictAccumulator* out) override { + typename EncodingTraits::DictAccumulator* out) override { ParquetException::NYI( "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); } @@ -2804,9 +2911,9 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelperBase helper(out); std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); @@ -2847,6 +2954,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::shared_ptr buffered_length_; }; +using DeltaLengthByteArrayDecoder = DeltaLengthByteArrayDecoderBase; +using DeltaLengthLargeByteArrayDecoder = + DeltaLengthByteArrayDecoderBase; + // ---------------------------------------------------------------------- // RLE_BOOLEAN_ENCODER @@ -3037,11 +3148,11 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder { +template +class DeltaByteArrayDecoderBase : public DecoderImpl, virtual public TypedDecoder { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, - MemoryPool* pool = ::arrow::default_memory_pool()) + explicit DeltaByteArrayDecoderBase(const ColumnDescriptor* descr, + MemoryPool* pool = ::arrow::default_memory_pool()) : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), suffix_decoder_(nullptr, pool), @@ -3083,17 +3194,16 @@ class DeltaByteArrayDecoder : public DecoderImpl, int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out) override { + typename EncodingTraits::Accumulator* out) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); return result; } - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename EncodingTraits::DictAccumulator* builder) override { + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override { ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); } @@ -3155,9 +3265,9 @@ class DeltaByteArrayDecoder : public DecoderImpl, Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, + typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelperBase helper(out); std::vector values(num_values); const int num_valid_values = GetInternal(values.data(), num_values - null_count); @@ -3190,7 +3300,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder prefix_len_decoder_; - DeltaLengthByteArrayDecoder suffix_decoder_; + DeltaLengthByteArrayDecoderBase suffix_decoder_; std::string last_value_; // string buffer for last value in previous page std::string last_value_in_previous_page_; @@ -3200,6 +3310,9 @@ class DeltaByteArrayDecoder : public DecoderImpl, std::shared_ptr buffered_data_; }; +using DeltaByteArrayDecoder = DeltaByteArrayDecoderBase; +using DeltaLargeByteArrayDecoder = DeltaByteArrayDecoderBase; + // ---------------------------------------------------------------------- // BYTE_STREAM_SPLIT @@ -3422,7 +3535,8 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encoding, const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool) { + ::arrow::MemoryPool* pool, + bool use_large_binary_variants) { if (encoding == Encoding::PLAIN) { switch (type_num) { case Type::BOOLEAN: @@ -3438,7 +3552,11 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin case Type::DOUBLE: return std::make_unique>(descr); case Type::BYTE_ARRAY: - return std::make_unique(descr); + if (use_large_binary_variants) { + return std::make_unique(descr); + } else { + return std::make_unique(descr); + } case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique(descr); default: @@ -3465,12 +3583,20 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { - return std::make_unique(descr, pool); + if (use_large_binary_variants) { + return std::make_unique(descr); + } else { + return std::make_unique(descr); + } } throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { if (type_num == Type::BYTE_ARRAY) { - return std::make_unique(descr, pool); + if (use_large_binary_variants) { + return std::make_unique(descr, pool); + } else { + return std::make_unique(descr, pool); + } } throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } else if (encoding == Encoding::RLE) { @@ -3487,8 +3613,8 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin namespace detail { std::unique_ptr MakeDictDecoder(Type::type type_num, - const ColumnDescriptor* descr, - MemoryPool* pool) { + const ColumnDescriptor* descr, MemoryPool* pool, + bool use_large_binary_variants) { switch (type_num) { case Type::BOOLEAN: ParquetException::NYI("Dictionary encoding not implemented for boolean type"); @@ -3503,7 +3629,12 @@ std::unique_ptr MakeDictDecoder(Type::type type_num, case Type::DOUBLE: return std::make_unique>(descr, pool); case Type::BYTE_ARRAY: - return std::make_unique(descr, pool); + if (use_large_binary_variants) { + return std::make_unique>(descr, + pool); + } else { + return std::make_unique>(descr, pool); + } case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique>(descr, pool); default: diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 9f9b740ff3424..f61c5e5b642d2 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -60,6 +60,7 @@ using Int96Encoder = TypedEncoder; using FloatEncoder = TypedEncoder; using DoubleEncoder = TypedEncoder; using ByteArrayEncoder = TypedEncoder; +using LargeByteArrayEncoder = TypedEncoder; using FLBAEncoder = TypedEncoder; template @@ -72,6 +73,7 @@ using Int96Decoder = TypedDecoder; using FloatDecoder = TypedDecoder; using DoubleDecoder = TypedDecoder; using ByteArrayDecoder = TypedDecoder; +using LargeByteArrayDecoder = TypedDecoder; class FLBADecoder; template @@ -140,17 +142,34 @@ template <> struct EncodingTraits { using Encoder = ByteArrayEncoder; using Decoder = ByteArrayDecoder; + using BinaryBuilder = ::arrow::BinaryBuilder; /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { - std::unique_ptr<::arrow::BinaryBuilder> builder; + std::unique_ptr builder; std::vector> chunks; }; using ArrowType = ::arrow::BinaryType; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>; }; +template <> +struct EncodingTraits { + using Encoder = LargeByteArrayEncoder; + using Decoder = LargeByteArrayDecoder; + using BinaryBuilder = ::arrow::LargeBinaryBuilder; + + /// \brief Internal helper class for decoding BYTE_ARRAY data where we can + /// overflow the capacity of a single arrow::BinaryArray + struct Accumulator { + std::unique_ptr builder; + std::vector> chunks; + }; + using ArrowType = ::arrow::LargeBinaryType; + using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::LargeBinaryType>; +}; + template <> struct EncodingTraits { using Encoder = FLBAEncoder; @@ -437,14 +456,16 @@ std::unique_ptr::Encoder> MakeTypedEncoder( PARQUET_EXPORT std::unique_ptr MakeDecoder( Type::type type_num, Encoding::type encoding, const ColumnDescriptor* descr = NULLPTR, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + bool use_large_binary_variants = false); namespace detail { PARQUET_EXPORT std::unique_ptr MakeDictDecoder(Type::type type_num, const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool); + ::arrow::MemoryPool* pool, + bool use_large_binary_variants); } // namespace detail @@ -453,7 +474,8 @@ std::unique_ptr> MakeDictDecoder( const ColumnDescriptor* descr = NULLPTR, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) { using OutType = DictDecoder; - auto decoder = detail::MakeDictDecoder(DType::type_num, descr, pool); + auto decoder = detail::MakeDictDecoder(DType::type_num, descr, pool, + std::is_same_v); return std::unique_ptr(dynamic_cast(decoder.release())); } @@ -462,7 +484,9 @@ std::unique_ptr::Decoder> MakeTypedDecoder( Encoding::type encoding, const ColumnDescriptor* descr = NULLPTR, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) { using OutType = typename EncodingTraits::Decoder; - std::unique_ptr base = MakeDecoder(DType::type_num, encoding, descr, pool); + + std::unique_ptr base = MakeDecoder(DType::type_num, encoding, descr, pool, + std::is_same_v); return std::unique_ptr(dynamic_cast(base.release())); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 0a9864de6266a..2b027ff6ab38f 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -783,7 +783,8 @@ class PARQUET_EXPORT ArrowReaderProperties { batch_size_(kArrowDefaultBatchSize), pre_buffer_(false), cache_options_(::arrow::io::CacheOptions::Defaults()), - coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), + use_large_binary_variants_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -851,6 +852,14 @@ class PARQUET_EXPORT ArrowReaderProperties { return coerce_int96_timestamp_unit_; } + /// Set whether to use large binary variants for binary data + /// (default is false). + void set_use_large_binary_variants(bool use_large_binary_variants) { + use_large_binary_variants_ = use_large_binary_variants; + } + /// Return whether use large binary variants is enabled. + bool use_large_binary_variants() const { return use_large_binary_variants_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -859,6 +868,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; + bool use_large_binary_variants_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index d4d6a73f147fc..11eb0e703b7a7 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -761,6 +761,18 @@ using Int96Type = PhysicalType; using FloatType = PhysicalType; using DoubleType = PhysicalType; using ByteArrayType = PhysicalType; + +/* + * Parquet has defined ByteArrayType for variable length string and binary values with a + * maximum length of 2^31 - 1. By default, arrow StringType and BinaryType are used to + * map parquet ByteArrayType. However, arrow StringArray/BinaryArray uses int32_t to + * store the offset of each string/binary value in a concatenated buffer which may + * overflow (though unlikely in most cases). As arrow has defined LargeStringType and + * LargeBinaryType which use int64_t as the offset type, we define LargeByteArrayType + * below to indicate parquet reader/writer to use those large variants from arrow. + * */ +struct LargeByteArrayType : public ByteArrayType {}; + using FLBAType = PhysicalType; template diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index b2e7cc7551591..d79a0101d90df 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit b2e7cc755159196e3a068c8594f7acbaecfdaaac +Subproject commit d79a0101d90dfa3bbb10337626f57a3e8c4b5363