From b315ebc991ce85f38a6e9ca11e7a3749e8778462 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 6 Oct 2023 09:26:14 +0200 Subject: [PATCH] GH-36765: [Python][Dataset] Change default of pre_buffer to True for reading Parquet files (#37854) ### Rationale for this change Enabling `pre_buffer` can give a significant speed-up on filesystems like S3, while it doesn't give noticeable slowdown on local filesystems, based on benchmarks in the issue. Therefore simply enabling it by default seems the best default. The option was already enabled by default in the `pyarrow.parquet.read_table` interface, this PR aligns the defaults when using `pyarrow.dataset` directly. * Closes: #36765 Authored-by: Joris Van den Bossche Signed-off-by: Joris Van den Bossche --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 7 ++++--- cpp/src/parquet/properties.h | 4 ++-- python/pyarrow/_dataset_parquet.pyx | 9 ++++++--- python/pyarrow/parquet/core.py | 5 +++-- python/pyarrow/tests/test_dataset.py | 10 +++++----- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 8585b1ccf11aa..7a94b1f3a1c14 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2354,10 +2354,11 @@ void TestGetRecordBatchReader( TEST(TestArrowReadWrite, GetRecordBatchReader) { TestGetRecordBatchReader(); } -// Same as the test above, but using coalesced reads. -TEST(TestArrowReadWrite, CoalescedReads) { +// Same as the test above, but using non-coalesced reads. +TEST(TestArrowReadWrite, NoneCoalescedReads) { ArrowReaderProperties arrow_properties = default_arrow_reader_properties(); - arrow_properties.set_pre_buffer(true); + arrow_properties.set_pre_buffer(false); + arrow_properties.set_cache_options(::arrow::io::CacheOptions::Defaults()) TestGetRecordBatchReader(arrow_properties); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 9344181d3f82f..c15ada0a8060f 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -836,8 +836,8 @@ class PARQUET_EXPORT ArrowReaderProperties { : use_threads_(use_threads), read_dict_indices_(), batch_size_(kArrowDefaultBatchSize), - pre_buffer_(false), - cache_options_(::arrow::io::CacheOptions::Defaults()), + pre_buffer_(true), + cache_options_(::arrow::io::CacheOptions::LazyDefaults()), coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index cf5c44c1c964a..9d85142564011 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -666,10 +666,13 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): Disabled by default. buffer_size : int, default 8192 Size of buffered stream, if enabled. Default is 8KB. - pre_buffer : bool, default False + pre_buffer : bool, default True If enabled, pre-buffer the raw Parquet data instead of issuing one read per column chunk. This can improve performance on high-latency - filesystems. + filesystems (e.g. S3, GCS) by coalesing and issuing file reads in + parallel using a background I/O thread pool. + Set to False if you want to prioritize minimal memory usage + over maximum speed. thrift_string_size_limit : int, default None If not None, override the maximum total string size allocated when decoding Thrift structures. The default limit should be @@ -688,7 +691,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): def __init__(self, *, bint use_buffered_stream=False, buffer_size=8192, - bint pre_buffer=False, + bint pre_buffer=True, thrift_string_size_limit=None, thrift_container_size_limit=None): self.init(shared_ptr[CFragmentScanOptions]( diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index e0cdfee62ef4b..a3e5ef76c99b6 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -1742,11 +1742,12 @@ class ParquetDataset: different partitioning schemes, etc. pre_buffer : bool, default True Coalesce and issue file reads in parallel to improve performance on - high-latency filesystems (e.g. S3). If True, Arrow will use a + high-latency filesystems (e.g. S3, GCS). If True, Arrow will use a background I/O thread pool. This option is only supported for use_legacy_dataset=False. If using a filesystem layer that itself performs readahead (e.g. fsspec's S3FS), disable readahead for best - results. + results. Set to False if you want to prioritize minimal memory usage + over maximum speed. coerce_int96_timestamp_unit : str, default None Cast timestamps that are stored in INT96 format to a particular resolution (e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96 diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 39c3c43daea37..671405d1ee6a0 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -784,28 +784,28 @@ def test_parquet_scan_options(): opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096) opts3 = ds.ParquetFragmentScanOptions( buffer_size=2**13, use_buffered_stream=True) - opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True) + opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=False) opts5 = ds.ParquetFragmentScanOptions( thrift_string_size_limit=123456, thrift_container_size_limit=987654,) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 - assert opts1.pre_buffer is False + assert opts1.pre_buffer is True assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 - assert opts2.pre_buffer is False + assert opts2.pre_buffer is True assert opts3.use_buffered_stream is True assert opts3.buffer_size == 2**13 - assert opts3.pre_buffer is False + assert opts3.pre_buffer is True assert opts4.use_buffered_stream is False assert opts4.buffer_size == 2**13 - assert opts4.pre_buffer is True + assert opts4.pre_buffer is False assert opts5.thrift_string_size_limit == 123456 assert opts5.thrift_container_size_limit == 987654