Skip to content

Commit c0ba0f2

Browse files
authored
Use fsspec.parquet for reading remote Parquet files (#385)
* Add automatic fsspec optimization for remote storage using fsspec.parquet (#369) * Simplify fsspec.parquet usage by assuming availability and always using open_parquet_file (#386) * Add explicit fsspec dependency * Resolve specialization problems * Simplify extraction of storage_options * Resolve errors found by tests * Remove now-unnecessary benchmark * Optimize local-directory check.
1 parent c5cf5ee commit c0ba0f2

File tree

4 files changed

+184
-19
lines changed

4 files changed

+184
-19
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies = [
2424
"pyarrow>=16", # remove struct_field_names() and struct_fields() when upgraded to 18+
2525
"Deprecated>=1.2.0",
2626
"wrapt>=1.12.1",
27+
"fsspec>=2025.7.0",
2728

2829
# NOTE: package PINNED at <0.3.0, see https://github.com/astronomy-commons/lsdb/issues/1047
2930
"universal_pathlib>=0.2,<0.3.0",

src/nested_pandas/nestedframe/io.py

Lines changed: 99 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from pathlib import Path
55

6+
import fsspec.parquet
67
import pandas as pd
78
import pyarrow as pa
89
import pyarrow.fs
@@ -14,7 +15,9 @@
1415
from ..series.utils import table_to_struct_array
1516
from .core import NestedFrame
1617

17-
# Use smaller block size for FSSPEC filesystems, it usually helps with parquet reads
18+
# Use smaller block size for these FSSPEC filesystems.
19+
# It usually helps with parquet read speed.
20+
FSSPEC_FILESYSTEMS = ("http", "https")
1821
FSSPEC_BLOCK_SIZE = 32 * 1024
1922

2023

@@ -25,19 +28,29 @@ def read_parquet(
2528
autocast_list: bool = False,
2629
**kwargs,
2730
) -> NestedFrame:
28-
"""
29-
Load a parquet object from a file path into a NestedFrame.
31+
"""Load a parquet object from a file path into a NestedFrame.
3032
31-
As a deviation from `pandas`, this function loads via
32-
`pyarrow.parquet.read_table`, and then converts to a NestedFrame.
33+
As a specialization of the ``pandas.read_parquet`` function, this
34+
function loads the data via existing ``pyarrow`` or
35+
``fsspec.parquet`` methods, and then converts the data to a
36+
NestedFrame.
3337
3438
Parameters
3539
----------
3640
data: str, list or str, Path, Upath, or file-like object
37-
Path to the data or a file-like object. If a string is passed, it can be a single file name,
38-
directory name, or a remote path (e.g., HTTP/HTTPS or S3). If a file-like object is passed,
39-
it must support the `read` method. You can also pass the `filesystem` argument with
40-
a `pyarrow.fs` object, which will be passed to `pyarrow.parquet.read_table()`.
41+
Path to the data or a file-like object. If a string is passed,
42+
it can be a single file name, directory name, or a remote path
43+
(e.g., HTTP/HTTPS or S3). If a file-like object is passed, it
44+
must support the ``read`` method. You can also pass a
45+
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will
46+
be passed along to the underlying file-reading method.
47+
A file URL can also be a path to a directory that contains multiple
48+
partitioned parquet files. Both pyarrow and fastparquet support
49+
paths to directories as well as file URLs. A directory path could be:
50+
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
51+
If the path is to a single Parquet file, it will be loaded using
52+
``fsspec.parquet.open_parquet_file``, which has optimized handling
53+
for remote Parquet files.
4154
columns : list, default=None
4255
If not None, only these columns will be read from the file.
4356
reject_nesting: list or str, default=None
@@ -57,6 +70,11 @@ def read_parquet(
5770
5871
Notes
5972
-----
73+
For paths to single Parquet files, this function uses
74+
fsspec.parquet.open_parquet_file, which performs intelligent
75+
precaching. This can significantly improve performance compared
76+
to standard PyArrow reading on remote files.
77+
6078
pyarrow supports partial loading of nested structures from parquet, for
6179
example ```pd.read_parquet("data.parquet", columns=["nested.a"])``` will
6280
load the "a" column of the "nested" column. Standard pandas/pyarrow
@@ -85,6 +103,7 @@ def read_parquet(
85103
86104
>>> #Load only the "flux" sub-column of the "nested" column
87105
>>> nf = npd.read_parquet("path/to/file.parquet", columns=["a", "nested.flux"]) # doctest: +SKIP
106+
88107
"""
89108

90109
# Type convergence for reject_nesting
@@ -93,14 +112,41 @@ def read_parquet(
93112
elif isinstance(reject_nesting, str):
94113
reject_nesting = [reject_nesting]
95114

96-
# First load through pyarrow
97-
# If `filesystem` is specified - use it
98-
if kwargs.get("filesystem") is not None:
99-
table = pq.read_table(data, columns=columns, **kwargs)
100-
# Otherwise convert with a special function
115+
# For single Parquet file paths, we want to use
116+
# `fsspec.parquet.open_parquet_file`. But for any other usage
117+
# (which includes file-like objects, directories and lists
118+
# thereof), we want to defer to `pq.read_table`.
119+
120+
# At the end of this block, `table` will contain the data.
121+
122+
# NOTE: the test for _is_local_dir is sufficient, because we're
123+
# preserving a path to pq.read_table, which can read local
124+
# directories, but not remote directories. Remote directories
125+
# cannot be read by either of these methods.
126+
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
127+
storage_options = _get_storage_options(path_to_data)
128+
filesystem = kwargs.get("filesystem")
129+
if not filesystem:
130+
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
131+
with fsspec.parquet.open_parquet_file(
132+
str(path_to_data),
133+
columns=columns,
134+
storage_options=storage_options,
135+
fs=filesystem,
136+
engine="pyarrow",
137+
) as parquet_file:
138+
table = pq.read_table(parquet_file, columns=columns, **kwargs)
101139
else:
102-
data, filesystem = _transform_read_parquet_data_arg(data)
103-
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
140+
# All other cases, including file-like objects, directories, and
141+
# even lists of the foregoing.
142+
143+
# If `filesystem` is specified - use it, passing it as part of **kwargs
144+
if kwargs.get("filesystem") is not None:
145+
table = pq.read_table(data, columns=columns, **kwargs)
146+
else:
147+
# Otherwise convert with a special function
148+
data, filesystem = _transform_read_parquet_data_arg(data)
149+
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
104150

105151
# Resolve partial loading of nested structures
106152
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
@@ -160,6 +206,41 @@ def read_parquet(
160206
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)
161207

162208

209+
def _is_local_dir(path_to_data: UPath):
210+
"""Returns True if the given path refers to a local directory.
211+
212+
It's necessary to have this function, rather than simply checking
213+
``UPath(p).is_dir()``, because ``UPath.is_dir`` can be quite
214+
expensive in the case of a remote file path that isn't a directory.
215+
"""
216+
return path_to_data.protocol in ("", "file") and path_to_data.is_dir()
217+
218+
219+
def _get_storage_options(path_to_data: UPath):
220+
"""Get storage options for fsspec.parquet.open_parquet_file.
221+
222+
Parameters
223+
----------
224+
path_to_data : UPath
225+
The data source
226+
227+
Returns
228+
-------
229+
dict
230+
Storage options (or None)
231+
"""
232+
if path_to_data.protocol not in ("", "file"):
233+
# Remote files of all types (s3, http)
234+
storage_options = path_to_data.storage_options or {}
235+
# For some cases, use smaller block size
236+
if path_to_data.protocol in FSSPEC_FILESYSTEMS:
237+
storage_options = {**storage_options, "block_size": FSSPEC_BLOCK_SIZE}
238+
return storage_options
239+
240+
# Local files
241+
return None
242+
243+
163244
def _transform_read_parquet_data_arg(data):
164245
"""Transform `data` argument of read_parquet to pq.read_parquet's `source` and `filesystem`"""
165246
# Check if a list, run the function recursively and check that filesystems are all the same
@@ -204,8 +285,8 @@ def _transform_read_parquet_data_arg(data):
204285
# If it is a local path, use pyarrow's filesystem
205286
if upath.protocol == "":
206287
return upath.path, None
207-
# If HTTP, change the default UPath object to use a smaller block size
208-
if upath.protocol in ("http", "https"):
288+
# Change the default UPath object to use a smaller block size in some cases
289+
if upath.protocol in FSSPEC_FILESYSTEMS:
209290
upath = UPath(upath, block_size=FSSPEC_BLOCK_SIZE)
210291
return upath.path, upath.fs
211292

tests/nested_pandas/nestedframe/test_io.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
import pytest
1313
from nested_pandas import NestedFrame, read_parquet
1414
from nested_pandas.datasets import generate_data
15-
from nested_pandas.nestedframe.io import _transform_read_parquet_data_arg, from_pyarrow
15+
from nested_pandas.nestedframe.io import (
16+
FSSPEC_BLOCK_SIZE,
17+
_get_storage_options,
18+
_transform_read_parquet_data_arg,
19+
from_pyarrow,
20+
)
1621
from pandas.testing import assert_frame_equal
1722
from upath import UPath
1823

@@ -399,3 +404,58 @@ def test__transform_read_parquet_data_arg():
399404
"https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet",
400405
]
401406
)
407+
408+
409+
def test_read_parquet_with_fsspec_optimization():
410+
"""Test that read_parquet automatically uses fsspec optimization for remote files."""
411+
# Test with local file (should not use fsspec optimization)
412+
local_path = "tests/test_data/nested.parquet"
413+
414+
# Test basic reading - local files should work as before
415+
nf1 = read_parquet(local_path)
416+
417+
# Test with additional kwargs
418+
nf2 = read_parquet(local_path, columns=["a", "nested.flux"], use_threads=True)
419+
420+
assert len(nf2) <= len(nf1) # filtered columns
421+
assert "a" in nf2.columns
422+
assert "nested" in nf2.columns
423+
424+
425+
def test_docstring_includes_fsspec_notes():
426+
"""Test that the docstring mentions the automatic fsspec optimization."""
427+
docstring = read_parquet.__doc__
428+
assert "fsspec" in docstring
429+
assert "remote" in docstring.lower()
430+
431+
432+
def test__get_storage_options():
433+
"""Test _get_storage_options function with various input types."""
434+
local_path = "tests/test_data/nested.parquet"
435+
436+
# Test with UPath objects (local files)
437+
local_upath = UPath(local_path)
438+
storage_opts = _get_storage_options(local_upath)
439+
assert storage_opts is None # Local UPath should have no storage options
440+
441+
# Test with UPath objects (HTTP)
442+
http_url = "http://example.com/data.parquet"
443+
http_upath = UPath(http_url)
444+
storage_opts = _get_storage_options(http_upath)
445+
assert storage_opts is not None
446+
assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE
447+
448+
# Test with UPath objects (HTTPS)
449+
https_url = "https://example.com/data.parquet"
450+
https_upath = UPath(https_url)
451+
storage_opts = _get_storage_options(https_upath)
452+
assert storage_opts is not None
453+
assert storage_opts.get("block_size") == FSSPEC_BLOCK_SIZE
454+
455+
# Test with UPath objects (S3)
456+
s3_url = "s3://bucket/path/data.parquet"
457+
s3_upath = UPath(s3_url)
458+
storage_opts = _get_storage_options(s3_upath)
459+
assert storage_opts is not None
460+
# S3 should NOT have the block_size override (only HTTP/HTTPS)
461+
assert storage_opts.get("block_size") != FSSPEC_BLOCK_SIZE

tests/nested_pandas/nestedframe/test_nestedframe.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,29 @@ def make_id(row, prefix_str):
11771177
get_max, columns=["packed.c", "packed.d"], output_names=["only_one_name"], row_container="args"
11781178
)
11791179

1180+
# Test output_names as a string (single output)
1181+
def get_single_max(row):
1182+
return row["packed.c"].max()
1183+
1184+
result = nf.map_rows(get_single_max, columns=["packed.c"], output_names="max_c")
1185+
assert len(result) == len(nf)
1186+
assert list(result.columns) == ["max_c"]
1187+
for i in range(len(result)):
1188+
assert result["max_c"].values[i] == expected_max_c[i]
1189+
1190+
# Test output_names as a list (multiple outputs)
1191+
def get_max_pair(row):
1192+
return pd.Series([row["packed.c"].max(), row["packed.d"].max()], index=["max_col1", "max_col2"])
1193+
1194+
result = nf.map_rows(
1195+
get_max_pair, columns=["packed.c", "packed.d"], output_names=["custom_max1", "custom_max2"]
1196+
)
1197+
assert len(result) == len(nf)
1198+
assert list(result.columns) == ["custom_max1", "custom_max2"]
1199+
for i in range(len(result)):
1200+
assert result["custom_max1"].values[i] == expected_max_c[i]
1201+
assert result["custom_max2"].values[i] == expected_max_d[i]
1202+
11801203
# Verify that append_columns=True works as expected.
11811204
# Ensure that even with non-unique indexes, the final result retains
11821205
# the original index (nested-pandas#301)

0 commit comments

Comments
 (0)