Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions pins/boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,13 @@ def pin_fetch(self, name: str, version: str | None = None) -> Meta:
# so they could pin_fetch and then examine the result, a la pin_download
return meta

def pin_read(self, name, version: str | None = None, hash: str | None = None):
def pin_read(
self,
name,
version: str | None = None,
hash: str | None = None,
type: str | None = None,
):
"""Return the data stored in a pin.

Parameters
Expand All @@ -200,7 +206,8 @@ def pin_read(self, name, version: str | None = None, hash: str | None = None):
hash:
A hash used to validate the retrieved pin data. If specified, it is
compared against the `pin_hash` field retrieved by [](`~pins.boards.BaseBoard.pin_meta`).

type:
A specific file type to read the pin as
"""
meta = self.pin_fetch(name, version)

Expand All @@ -216,14 +223,14 @@ def pin_read(self, name, version: str | None = None, hash: str | None = None):
pin_name = self.path_to_pin(name)

return self._load_data(
meta, self.construct_path([pin_name, meta.version.version])
meta, self.construct_path([pin_name, meta.version.version]), type=type
)

def _pin_store(
self,
x,
name: str | None = None,
type: str | None = None,
type: str | list[str] | None = None,
title: str | None = None,
description: str | None = None,
metadata: Mapping | None = None,
Expand Down Expand Up @@ -339,7 +346,7 @@ def pin_write(
self,
x,
name: str | None = None,
type: str | None = None,
type: str | list[str] | None = None,
title: str | None = None,
description: str | None = None,
metadata: Mapping | None = None,
Expand All @@ -357,8 +364,9 @@ def pin_write(
name:
Pin name.
type:
File type used to save `x` to disk. May be "csv", "arrow", "parquet",
"joblib", or "json".
File type(s) used to save `x` to disk. May be a single string or a list of strings.
Supported types include "csv", "arrow", "parquet", "joblib", or "json".
When a list is provided, the object will be saved in each of the specified formats.
title:
A title for the pin; most important for shared boards so that others
can understand what the pin contains. If omitted, a brief description
Expand Down Expand Up @@ -689,7 +697,7 @@ def prepare_pin_version(
pin_dir_path,
x,
name: str | None = None,
type: str | None = None,
type: str | list[str] | None = None,
title: str | None = None,
description: str | None = None,
metadata: Mapping | None = None,
Expand Down Expand Up @@ -720,7 +728,7 @@ def _create_meta(
pin_dir_path,
x: Adaptor | Any,
name: str | None = None,
type: str | None = None,
type: str | list[str] | None = None,
title: str | None = None,
description: str | None = None,
metadata: Mapping | None = None,
Expand Down Expand Up @@ -750,13 +758,27 @@ def _create_meta(
p_obj = str(Path(pin_dir_path) / name)
else:
p_obj = str(Path(pin_dir_path) / object_name)
# file is saved locally in order to hash, calc size
file_names = save_data(x, p_obj, type, apply_suffix)

# Handle multiple types
type_value = [type] if isinstance(type, str) else type
file_names = []

# Save each type
for t in type_value:
# file is saved locally in order to hash, calc size
files = save_data(x, p_obj, t, apply_suffix)
# If save_data returns a list, extend file_names with it
if isinstance(files, list):
file_names.extend(files)
else:
file_names.append(files)

# Always use a list for the type value, even with a single type

meta = self.meta_factory.create(
pin_dir_path,
file_names,
type,
type_value,
title=title,
description=description,
user=metadata,
Expand All @@ -780,10 +802,14 @@ def _extract_search_meta(self, meta):

# data loading ------------------------------------------------------------

def _load_data(self, meta, pin_version_path):
def _load_data(self, meta, pin_version_path, type: str | None = None):
"""Return the data object stored by a pin (e.g. a DataFrame)."""
return load_data(
meta, self.fs, pin_version_path, allow_pickle_read=self.allow_pickle_read
meta,
self.fs,
pin_version_path,
allow_pickle_read=self.allow_pickle_read,
type=type,
)

# filesystem and cache methods --------------------------------------------
Expand Down Expand Up @@ -1161,6 +1187,12 @@ def _open_pin_meta(self, path):
return f, local

def validate_pin_name(self, name) -> None:
# Check if name is None or not a string
if name is None or not isinstance(name, str):
raise ValueError(
f"Pin name must be a string, got {type(name).__name__}: {name}"
)

# this should be the default behavior, expecting a full pin name.
# but because the tests use short names, we allow it to be disabled via config
if not get_allow_rsc_short_name() and name.count("/") != 1:
Expand Down
72 changes: 59 additions & 13 deletions pins/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def load_data(
fs,
path_to_version: "str | None" = None,
allow_pickle_read: "bool | None" = None,
type: "str | None" = None,
):
"""Return loaded data, based on meta type.
Parameters
Expand All @@ -56,59 +57,104 @@ def load_data(
"""

# TODO: extandable loading with deferred importing
if meta.type in UNSAFE_TYPES and not get_allow_pickle_read(allow_pickle_read):

# If a specific type is provided, use that
if type is not None:
pin_type = type
else:
# If meta.type is a list, use the first type in the list
if isinstance(meta.type, list):
pin_type = meta.type[0]
else:
pin_type = meta.type

if pin_type in UNSAFE_TYPES and not get_allow_pickle_read(allow_pickle_read):
raise PinsInsecureReadError(
f"Reading pin type {meta.type} involves reading a pickle file, so is NOT secure."
f"Reading pin type {pin_type} involves reading a pickle file, so is NOT secure."
f"Set the allow_pickle_read=True when creating the board, or the "
f"{PINS_ENV_INSECURE_READ}=1 environment variable.\n"
"See:\n"
" * https://docs.python.org/3/library/pickle.html \n"
" * https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations"
)

with load_file(meta.file, fs, path_to_version, meta.type) as f:
if meta.type == "csv":
# If meta.file is a list, find the appropriate file for the requested type
if isinstance(meta.file, (list, tuple)):
# For each file type, expect the filename to end with the type extension
file_extension_map = {
"csv": ".csv",
"arrow": ".arrow",
"feather": ".feather",
"parquet": ".parquet",
"json": ".json",
"joblib": ".joblib",
"rds": ".rds",
}

# Look for a file with the correct extension
ext = file_extension_map.get(pin_type)
if ext:
matching_files = [f for f in meta.file if f.endswith(ext)]
if matching_files:
filename = matching_files[0]
else:
# If no exact match found, try to find a file containing the pin type in its name
matching_files = [f for f in meta.file if pin_type in f.lower()]
if matching_files:
filename = matching_files[0]
else:
raise ValueError(
f"No file found for type {pin_type}. Available files: {meta.file}"
)
else:
# Fall back to the first file if we don't know the extension mapping
filename = meta.file[0]
else:
filename = meta.file

with load_file(filename, fs, path_to_version, pin_type) as f:
if pin_type == "csv":
import pandas as pd

return pd.read_csv(f)

elif meta.type == "arrow":
elif pin_type == "arrow":
import pandas as pd

return pd.read_feather(f)

elif meta.type == "feather":
elif pin_type == "feather":
import pandas as pd

return pd.read_feather(f)

elif meta.type == "parquet":
elif pin_type == "parquet":
import pandas as pd

return pd.read_parquet(f)

elif meta.type == "table":
elif pin_type == "table":
import pandas as pd

return pd.read_csv(f)

elif meta.type == "joblib":
elif pin_type == "joblib":
import joblib

return joblib.load(f)

elif meta.type == "json":
elif pin_type == "json":
import json

return json.load(f)

elif meta.type == "file":
elif pin_type == "file":
raise NotImplementedError(
"Methods like `.pin_read()` are not able to read 'file' type pins."
" Use `.pin_download()` to download the file."
)

elif meta.type == "rds":
elif pin_type == "rds":
try:
import rdata # pyright: ignore[reportMissingImports]

Expand All @@ -118,7 +164,7 @@ def load_data(
"Install the 'rdata' package to attempt to convert 'rds' files into Python objects."
)

raise NotImplementedError(f"No driver for type {meta.type}")
raise NotImplementedError(f"No driver for type {pin_type}")


def save_data(
Expand Down
8 changes: 3 additions & 5 deletions pins/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MetaRaw:
"""

file: str | Sequence[str] | None
type: str
type: str | Sequence[str]
name: str


Expand All @@ -53,7 +53,7 @@ class Meta:
file_size:
The total size of the files in the pin.
type:
The type of pin data stored. This is used to determine how to read / write it.
The type(s) of pin data stored. This is used to determine how to read / write it.
api_version:
The internal version of the metadata format.
name:
Expand All @@ -77,16 +77,14 @@ class Meta:

file: str | Sequence[str]
file_size: int
type: str

type: str | list[str]
api_version: int

# In the metadata yaml, the created field uses a custom format, so
# we need a version object in order to render it. You can think of
# the version here as "the thing that was used to create version_name,
# pin_hash, created, etc.."
version: VersionRaw

tags: list[str] | None = None
name: str | None = None
user: Mapping = field(default_factory=dict)
Expand Down
46 changes: 46 additions & 0 deletions pins/tests/test_boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,52 @@ def test_board_pin_write_type(board, obj, type_, request):
obj == dst_obj


@skip_if_dbc
def test_board_pin_read_with_specified_type(board):
# Create a DataFrame
df = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "c"]})

# Write it as CSV
with rm_env(PINS_ENV_INSECURE_READ):
os.environ[PINS_ENV_INSECURE_READ] = "1"
meta = board.pin_write(
df, "test_read_pin_type", type="csv", title="test read with type"
)

# Read it normally
read_df = board.pin_read("test_read_pin_type")
assert meta.type == "csv"
assert read_df.equals(df)

# Read it with explicit type parameter
read_df_with_type = board.pin_read("test_read_pin_type", type="csv")
assert read_df_with_type.equals(df)


@skip_if_dbc
def test_board_pin_write_multiple_types(board):
# Create a DataFrame
df = pd.DataFrame({"x": [1, 2, 3], "y": ["a", "b", "c"]})

# Write it with multiple types
with rm_env(PINS_ENV_INSECURE_READ):
os.environ[PINS_ENV_INSECURE_READ] = "1"
meta = board.pin_write(
df, "test_multi_type", type=["csv", "parquet"], title="multi-type pin"
)

# Verify the primary type is set to the first type in the list
assert meta.type == ["csv", "parquet"]

# Read with default type (should use primary type)
read_df = board.pin_read("test_multi_type")
assert read_df.equals(df)

# Read with explicit type parameter
read_df_parquet = board.pin_read("test_multi_type", type="parquet")
assert read_df_parquet.equals(df)


@skip_if_dbc
def test_board_pin_read_insecure_fail_default(board):
board.pin_write({"a": 1}, "test_pin", type="joblib", title="some title")
Expand Down
Loading